diff options
Diffstat (limited to 'src')
-rwxr-xr-x | src/bp_sim.cpp | 7 | ||||
-rwxr-xr-x | src/bp_sim.h | 5 | ||||
-rw-r--r-- | src/debug.cpp | 12 | ||||
-rw-r--r-- | src/flow_stat.cpp | 234 | ||||
-rw-r--r-- | src/flow_stat.h | 27 | ||||
-rw-r--r-- | src/flow_stat_parser.cpp | 52 | ||||
-rw-r--r-- | src/flow_stat_parser.h | 36 | ||||
-rw-r--r-- | src/gtest/trex_stateless_gtest.cpp | 2 | ||||
-rw-r--r-- | src/internal_api/trex_platform_api.h | 13 | ||||
-rw-r--r-- | src/latency.cpp | 11 | ||||
-rw-r--r-- | src/latency.h | 4 | ||||
-rw-r--r-- | src/main_dpdk.cpp | 377 | ||||
-rw-r--r-- | src/main_dpdk.h | 10 | ||||
-rwxr-xr-x | src/msg_manager.cpp | 17 | ||||
-rwxr-xr-x | src/msg_manager.h | 26 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.cpp | 1 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 7 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 100 | ||||
-rw-r--r-- | src/stateless/cp/trex_streams_compiler.cpp | 3 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.cpp | 16 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 46 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 80 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.cpp | 217 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.h | 80 |
24 files changed, 1022 insertions, 361 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index 6ea40be2..cc9af837 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -6039,8 +6039,13 @@ uint16_t CSimplePacketParser::getPktSize(){ return ( ip_len +m_vlan_offset+14); } +uint16_t CSimplePacketParser::getIpId() { + if (m_ipv4) { + return ( m_ipv4->getId() ); + } - + return (0); +} uint8_t CSimplePacketParser::getTTl(){ if (m_ipv4) { diff --git a/src/bp_sim.h b/src/bp_sim.h index 37ed7854..4b1a88e3 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -1246,9 +1246,10 @@ static inline int get_is_rx_check_mode(){ return (CGlobalInfo::m_options.preview.get_is_rx_check_enable() ?1:0); } -static inline bool get_is_rx_filter_enable(){//??? +static inline bool get_is_rx_filter_enable(){ uint32_t latency_rate=CGlobalInfo::m_options.m_latency_rate; - return ( ( get_is_rx_check_mode() || CGlobalInfo::is_learn_mode() || latency_rate != 0) ?true:false ); + return ( ( get_is_rx_check_mode() || CGlobalInfo::is_learn_mode() || latency_rate != 0 + || get_is_stateless()) ?true:false ); } static inline uint16_t get_rx_check_hops() { return (CGlobalInfo::m_options.m_rx_check_hops); diff --git a/src/debug.cpp b/src/debug.cpp index 902766a1..656549dc 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -415,11 +415,17 @@ int CTrexDebug::test_send(uint pkt_type) { lp->dump_stats_extended(stdout); } for (port_id = 0; port_id < m_max_ports; port_id++) { - uint64_t fdir_stat[MAX_FLOW_STATS]; + rx_per_flow_t fdir_stat[MAX_FLOW_STATS]; + uint64_t fdir_stat_64[MAX_FLOW_STATS]; CPhyEthIF *lp = &m_ports[port_id]; - if (lp->get_flow_stats(fdir_stat, NULL, 0, MAX_FLOW_STATS, false) == 0) - rte_stat_dump_array(fdir_stat, "FDIR stat", MAX_FLOW_STATS); + if (lp->get_flow_stats(fdir_stat, NULL, 0, MAX_FLOW_STATS, false) == 0) { + for (int i = 0; i < MAX_FLOW_STATS; i++) { + fdir_stat_64[i] = fdir_stat[i].get_pkts(); + } + rte_stat_dump_array(fdir_stat_64, "FDIR stat", MAX_FLOW_STATS); + } } + return (0); } diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp index f03065d2..43bde08b 100644 --- a/src/flow_stat.cpp +++ b/src/flow_stat.cpp @@ -25,6 +25,7 @@ #include <os_time.h> #include "internal_api/trex_platform_api.h" #include "trex_stateless.h" +#include "trex_stateless_messaging.h" #include "trex_stream.h" #include "flow_stat_parser.h" #include "flow_stat.h" @@ -33,7 +34,6 @@ #define FLOW_STAT_ADD_ALL_PORTS 255 static const uint16_t FREE_HW_ID = UINT16_MAX; -static bool no_stat_supported = true; inline std::string methodName(const std::string& prettyFunction) { @@ -106,7 +106,7 @@ int CFlowStatUserIdInfo::add_stream(uint8_t proto) { #endif if (proto != m_proto) - return -1; + throw TrexException("Can't use same pg_id for streams with different l4 protocol"); m_ref_count++; @@ -121,7 +121,7 @@ void CFlowStatUserIdInfo::reset_hw_id() { // Next session will start counting from 0. for (int i = 0; i < TREX_MAX_PORTS; i++) { m_rx_counter_base[i] += m_rx_counter[i]; - m_rx_counter[i] = 0; + memset(&m_rx_counter[i], 0, sizeof(m_rx_counter[0])); m_tx_counter_base[i] += m_tx_counter[i]; memset(&m_tx_counter[i], 0, sizeof(m_tx_counter[0])); } @@ -197,7 +197,7 @@ int CFlowStatUserIdMap::add_stream(uint32_t user_id, uint8_t proto) { if (! c_user_id) { c_user_id = add_user_id(user_id, proto); if (! c_user_id) - return -1; + throw TrexException("Failed adding statistic counter - Failure in add_stream"); return 0; } else { return c_user_id->add_stream(proto); @@ -213,7 +213,7 @@ int CFlowStatUserIdMap::del_stream(uint32_t user_id) { c_user_id = find_user_id(user_id); if (! c_user_id) { - return -1; + throw TrexException("Trying to delete stream which does not exist"); } if (c_user_id->del_stream() == 0) { @@ -236,13 +236,13 @@ int CFlowStatUserIdMap::start_stream(uint32_t user_id, uint16_t hw_id) { if (! c_user_id) { fprintf(stderr, "%s Error: Trying to associate hw id %d to user_id %d but it does not exist\n" , __func__, hw_id, user_id); - return -1; + throw TrexException("Internal error: Trying to associate non exist group id"); } if (c_user_id->is_hw_id()) { - fprintf(stderr, "%s Error: Trying to associate hw id %d to user_id %d but it is already associate to %u\n" + fprintf(stderr, "%s Error: Trying to associate hw id %d to user_id %d but it is already associated to %u\n" , __func__, hw_id, user_id, c_user_id->get_hw_id()); - return -1; + throw TrexException("Internal error: Trying to associate used packet group id to different hardware counter"); } c_user_id->set_hw_id(hw_id); c_user_id->add_started_stream(); @@ -259,9 +259,9 @@ int CFlowStatUserIdMap::start_stream(uint32_t user_id) { c_user_id = find_user_id(user_id); if (! c_user_id) { - fprintf(stderr, "%s Error: Trying to start stream on user_id %d but it does not exist\n" + fprintf(stderr, "%s Error: Trying to start stream on pg_id %d but it does not exist\n" , __func__, user_id); - return -1; + throw TrexException("Trying to start stream with non exist packet group id"); } c_user_id->add_started_stream(); @@ -280,9 +280,9 @@ int CFlowStatUserIdMap::stop_stream(uint32_t user_id) { c_user_id = find_user_id(user_id); if (! c_user_id) { - fprintf(stderr, "%s Error: Trying to stop stream on user_id %d but it does not exist\n" + fprintf(stderr, "%s Error: Trying to stop stream on pg_id %d but it does not exist\n" , __func__, user_id); - return -1; + throw TrexException("Trying to stop stream with non exist packet group id"); } return c_user_id->stop_started_stream(); @@ -385,6 +385,34 @@ void CFlowStatHwIdMap::unmap(uint16_t hw_id) { CFlowStatRuleMgr::CFlowStatRuleMgr() { m_api = NULL; m_max_hw_id = -1; + m_num_started_streams = 0; + m_ring_to_rx = NULL; + m_capabilities = 0; + m_parser = NULL; +} + +CFlowStatRuleMgr::~CFlowStatRuleMgr() { + if (m_parser) + delete m_parser; +} + +void CFlowStatRuleMgr::create() { + uint16_t num_counters, capabilities; + TrexStateless *tstateless = get_stateless_obj(); + assert(tstateless); + + m_api = tstateless->get_platform_api(); + assert(m_api); + m_api->get_interface_stat_info(0, num_counters, capabilities); + m_api->get_port_num(m_num_ports); + for (uint8_t port = 0; port < m_num_ports; port++) { + assert(m_api->reset_hw_flow_stats(port) == 0); + } + m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); + assert(m_ring_to_rx); + m_parser = m_api->get_flow_stat_parser(); + assert(m_parser); + m_capabilities = capabilities; } std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) { @@ -394,38 +422,30 @@ std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) { return os; } -int CFlowStatRuleMgr::compile_stream(const TrexStream * stream, Cxl710Parser &parser) { +int CFlowStatRuleMgr::compile_stream(const TrexStream * stream, CFlowStatParser *parser) { #ifdef __DEBUG_FUNC_ENTRY__ std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << " en:"; std::cout << stream->m_rx_check.m_enabled << std::endl; #endif - // currently we support only IP ID rule types - // all our ports are the same type, so testing port 0 is enough - uint16_t num_counters, capabilities; - m_api->get_interface_stat_info(0, num_counters, capabilities); - if ((capabilities & TrexPlatformApi::IF_STAT_IPV4_ID) == 0) { - return -2; - } - - if (parser.parse(stream->m_pkt.binary, stream->m_pkt.len) != 0) { + if (parser->parse(stream->m_pkt.binary, stream->m_pkt.len) != 0) { // if we could not parse the packet, but no stat count needed, it is probably OK. if (stream->m_rx_check.m_enabled) { fprintf(stderr, "Error: %s - Compilation failed\n", __func__); - return -1; + throw TrexException("Failed parsing given packet for flow stat. Probably bad packet format."); } else { return 0; } } - if (!parser.is_fdir_supported()) { + if (!parser->is_stat_supported()) { if (stream->m_stream_id <= 0) { - // rx stat not needed. Do nothing. + // flow stat not needed. Do nothing. return 0; } else { - // rx stat needed, but packet format is not supported - fprintf(stderr, "Error: %s - Unsupported packet format for rx stat\n", __func__); - return -1; + // flow stat needed, but packet format is not supported + fprintf(stderr, "Error: %s - Unsupported packet format for flow stat\n", __func__); + throw TrexException("Unsupported packet format for flow stat on given interface type"); } } return 0; @@ -436,44 +456,36 @@ int CFlowStatRuleMgr::add_stream(const TrexStream * stream) { std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl; #endif - if (! m_api ) { - TrexStateless *tstateless = get_stateless_obj(); - m_api = tstateless->get_platform_api(); - uint16_t num_counters, capabilities; - m_api->get_interface_stat_info(0, num_counters, capabilities); - if ((capabilities & TrexPlatformApi::IF_STAT_IPV4_ID) == 0) { - // All our interfaces are from the same type. If statistics not supported. - // no operation will work - return -1; - } else { - no_stat_supported = false; - } - m_api->get_port_num(m_num_ports); - for (uint8_t port = 0; port < m_num_ports; port++) { - assert(m_api->reset_hw_flow_stats(port) == 0); - } + if (! stream->m_rx_check.m_enabled) { + return 0; } - if (no_stat_supported) - return -ENOTSUP; + // Init everything here, and not in the constructor, since we relay on other objects + // By the time a stream is added everything else is initialized. + if (! m_api ) { + create(); + } - Cxl710Parser parser; - int ret; + uint16_t rule_type = TrexPlatformApi::IF_STAT_IPV4_ID; // In the future need to get it from the stream; - if (! stream->m_rx_check.m_enabled) { - return 0; + if ((m_capabilities & rule_type) == 0) { + fprintf(stderr, "Error: %s - rule type not supported by interface\n", __func__); + throw TrexException("Interface does not support given rule type"); } - if ((ret = compile_stream(stream, parser)) < 0) - return ret; + // compile_stream throws exception if something goes wrong + compile_stream(stream, m_parser); uint8_t l4_proto; - if (parser.get_l4_proto(l4_proto) < 0) { - printf("Error: %s failed finding l4 proto\n", __func__); - return -1; + if (m_parser->get_l4_proto(l4_proto) < 0) { + fprintf(stderr, "Error: %s failed finding l4 proto\n", __func__); + throw TrexException("Failed determining l4 proto for packet"); } - return m_user_id_map.add_stream(stream->m_rx_check.m_pg_id, l4_proto); + // throws exception if there is error + m_user_id_map.add_stream(stream->m_rx_check.m_pg_id, l4_proto); + + return 0; } int CFlowStatRuleMgr::del_stream(const TrexStream * stream) { @@ -481,14 +493,23 @@ int CFlowStatRuleMgr::del_stream(const TrexStream * stream) { std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl; #endif - if (no_stat_supported) - return -ENOTSUP; + if (! m_api) + throw TrexException("Called del_stream, but no stream was added"); if (! stream->m_rx_check.m_enabled) { return 0; } - return m_user_id_map.del_stream(stream->m_rx_check.m_pg_id); + if (m_user_id_map.is_started(stream->m_rx_check.m_pg_id)) { + std::cerr << "Error: Trying to delete flow statistics stream " << stream->m_rx_check.m_pg_id + << " which is not stopped." << std::endl; + throw TrexException("Trying to delete stream which was not stopped"); + } + + // Throws exception in case of error + m_user_id_map.del_stream(stream->m_rx_check.m_pg_id); + + return 0; } // called on all streams, when stream start to transmit @@ -502,33 +523,49 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream, uint16_t &ret_hw_id) { std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl; #endif - Cxl710Parser parser; int ret; - - if (no_stat_supported) - return -ENOTSUP; - - if ((ret = compile_stream(stream, parser)) < 0) - return ret; + // Streams which does not need statistics might be started, before any stream that do + // need statistcs, so start_stream might be called before add_stream + if (! m_api ) { + create(); + } // first handle streams that do not need rx stat if (! stream->m_rx_check.m_enabled) { - // no need for stat count + try { + compile_stream(stream, m_parser); + } catch (TrexException) { + // If no statistics needed, and we can't parse the stream, that's OK. + return 0; + } + uint16_t ip_id; - if (parser.get_ip_id(ip_id) < 0) { - return 0; // if we could not find and ip id, no need to fix + if (m_parser->get_ip_id(ip_id) < 0) { + return 0; // if we could not find the ip id, no need to fix } // verify no reserved IP_ID used, and change if needed if (ip_id >= IP_ID_RESERVE_BASE) { - if (parser.set_ip_id(ip_id & 0xefff) < 0) { - return -1; + if (m_parser->set_ip_id(ip_id & 0xefff) < 0) { + throw TrexException("Stream IP ID in reserved range. Failed changing it"); } } return 0; } - uint16_t hw_id; // from here, we know the stream need rx stat + + // compile_stream throws exception if something goes wrong + if ((ret = compile_stream(stream, m_parser)) < 0) + return ret; + + uint16_t hw_id; + uint16_t rule_type = TrexPlatformApi::IF_STAT_IPV4_ID; // In the future, need to get it from the stream; + + if ((m_capabilities & rule_type) == 0) { + fprintf(stderr, "Error: %s - rule type not supported by interface\n", __func__); + throw TrexException("Interface does not support given rule type"); + } + if (m_user_id_map.is_started(stream->m_rx_check.m_pg_id)) { m_user_id_map.start_stream(stream->m_rx_check.m_pg_id); // just increase ref count; hw_id = m_user_id_map.get_hw_id(stream->m_rx_check.m_pg_id); // can't fail if we got here @@ -536,19 +573,19 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream, uint16_t &ret_hw_id) { hw_id = m_hw_id_map.find_free_hw_id(); if (hw_id == FREE_HW_ID) { printf("Error: %s failed finding free hw_id\n", __func__); - return -1; + throw TrexException("Failed allocating statistic counter. Probably all are used."); } else { if (hw_id > m_max_hw_id) { m_max_hw_id = hw_id; } uint32_t user_id = stream->m_rx_check.m_pg_id; - m_user_id_map.start_stream(user_id, hw_id); + m_user_id_map.start_stream(user_id, hw_id); // ??? can throw exception. return hw_id m_hw_id_map.map(hw_id, user_id); add_hw_rule(hw_id, m_user_id_map.l4_proto(user_id)); } } - parser.set_ip_id(IP_ID_RESERVE_BASE + hw_id); + m_parser->set_ip_id(IP_ID_RESERVE_BASE + hw_id); ret_hw_id = hw_id; @@ -556,6 +593,10 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream, uint16_t &ret_hw_id) { std::cout << "exit:" << __METHOD_NAME__ << " hw_id:" << ret_hw_id << std::endl; #endif + if (m_num_started_streams == 0) { + send_start_stop_msg_to_rx(true); // First transmitting stream. Rx core should start reading packets; + } + m_num_started_streams++; return 0; } @@ -571,13 +612,13 @@ int CFlowStatRuleMgr::stop_stream(const TrexStream * stream) { #ifdef __DEBUG_FUNC_ENTRY__ std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl; #endif - if (no_stat_supported) - return -ENOTSUP; - if (! stream->m_rx_check.m_enabled) { return 0; } + if (! m_api) + throw TrexException("Called stop_stream, but no stream was added"); + if (m_user_id_map.stop_stream(stream->m_rx_check.m_pg_id) == 0) { // last stream associated with the entry stopped transmittig. // remove user_id <--> hw_id mapping @@ -585,12 +626,12 @@ int CFlowStatRuleMgr::stop_stream(const TrexStream * stream) { uint16_t hw_id = m_user_id_map.get_hw_id(stream->m_rx_check.m_pg_id); if (hw_id >= MAX_FLOW_STATS) { fprintf(stderr, "Error: %s got wrong hw_id %d from unmap\n", __func__, hw_id); - return -1; + throw TrexException("Internal error in stop_stream. Got bad hw_id"); } else { // update counters, and reset before unmapping CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(hw_id)); assert(p_user_id != NULL); - uint64_t rx_counter; + rx_per_flow_t rx_counter; tx_per_flow_t tx_counter; for (uint8_t port = 0; port < m_num_ports; port++) { m_api->del_rx_flow_stat_rule(port, FLOW_STAT_RULE_TYPE_IPV4_ID, proto, hw_id); @@ -605,6 +646,11 @@ int CFlowStatRuleMgr::stop_stream(const TrexStream * stream) { m_hw_id_map.unmap(hw_id); } } + m_num_started_streams--; + assert (m_num_started_streams >= 0); + if (m_num_started_streams == 0) { + send_start_stop_msg_to_rx(false); // No more transmittig streams. Rx core shoulde get into idle loop. + } return 0; } @@ -618,16 +664,28 @@ int CFlowStatRuleMgr::get_active_pgids(flow_stat_active_t &result) { return 0; } +extern bool rx_should_stop; +void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) { + TrexStatelessCpToRxMsgBase *msg; + + if (is_start) { + msg = new TrexStatelessRxStartMsg(); + } else { + msg = new TrexStatelessRxStopMsg(); + } + m_ring_to_rx->Enqueue((CGenNode *)msg); +} + // return false if no counters changed since last run. true otherwise bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { - uint64_t rx_stats[MAX_FLOW_STATS]; + rx_per_flow_t rx_stats[MAX_FLOW_STATS]; tx_per_flow_t tx_stats[MAX_FLOW_STATS]; Json::FastWriter writer; Json::Value root; root["name"] = "flow_stats"; root["type"] = 0; - + if (baseline) { root["baseline"] = true; } @@ -645,15 +703,16 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { for (uint8_t port = 0; port < m_num_ports; port++) { m_api->get_flow_stats(port, rx_stats, (void *)tx_stats, 0, m_max_hw_id, false); for (int i = 0; i <= m_max_hw_id; i++) { - if (rx_stats[i] != 0) { + if (rx_stats[i].get_pkts() != 0) { + rx_per_flow_t rx_pkts = rx_stats[i]; CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(i)); if (likely(p_user_id != NULL)) { - if (p_user_id->get_rx_counter(port) != rx_stats[i]) { - p_user_id->set_rx_counter(port, rx_stats[i]); + if (p_user_id->get_rx_counter(port) != rx_pkts) { + p_user_id->set_rx_counter(port, rx_pkts); p_user_id->set_need_to_send_rx(port); } } else { - std::cerr << __METHOD_NAME__ << i << ":Could not count " << rx_stats[i] << " rx packets, on port " + std::cerr << __METHOD_NAME__ << i << ":Could not count " << rx_pkts << " rx packets, on port " << (uint16_t)port << ", because no mapping was found." << std::endl; } } @@ -690,7 +749,8 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { std::string str_port = static_cast<std::ostringstream*>( &(std::ostringstream() << int(port) ) )->str(); if (user_id_info->need_to_send_rx(port) || baseline) { user_id_info->set_no_need_to_send_rx(port); - data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_counter(port)); + data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_counter(port).get_pkts()); + data_section[str_user_id]["rx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_rx_counter(port).get_bytes()); send_empty = false; } if (user_id_info->need_to_send_tx(port) || baseline) { diff --git a/src/flow_stat.h b/src/flow_stat.h index 3e00a180..ea33062d 100644 --- a/src/flow_stat.h +++ b/src/flow_stat.h @@ -26,6 +26,7 @@ #include <map> #include "trex_defs.h" #include "trex_stream.h" +#include "msg_manager.h" #include <internal_api/trex_platform_api.h> // range reserved for rx stat measurement is from IP_ID_RESERVE_BASE to 0xffff @@ -50,7 +51,7 @@ class tx_per_flow_t_ { inline void set_bytes(uint64_t bytes) { m_bytes = bytes;; } - inline void get_pkts(uint64_t pkts) { + inline void set_pkts(uint64_t pkts) { m_pkts = pkts; } inline void add_bytes(uint64_t bytes) { @@ -100,16 +101,17 @@ class tx_per_flow_t_ { }; typedef class tx_per_flow_t_ tx_per_flow_t; +typedef class tx_per_flow_t_ rx_per_flow_t; class CPhyEthIF; -class Cxl710Parser; +class CFlowStatParser; class CFlowStatUserIdInfo { public: CFlowStatUserIdInfo(uint8_t proto); friend std::ostream& operator<<(std::ostream& os, const CFlowStatUserIdInfo& cf); - void set_rx_counter(uint8_t port, uint64_t val) {m_rx_counter[port] = val;} - uint64_t get_rx_counter(uint8_t port) {return m_rx_counter[port] + m_rx_counter_base[port];} + void set_rx_counter(uint8_t port, rx_per_flow_t val) {m_rx_counter[port] = val;} + rx_per_flow_t get_rx_counter(uint8_t port) {return m_rx_counter[port] + m_rx_counter_base[port];} void set_tx_counter(uint8_t port, tx_per_flow_t val) {m_tx_counter[port] = val;} tx_per_flow_t get_tx_counter(uint8_t port) {return m_tx_counter[port] + m_tx_counter_base[port];} void set_hw_id(uint16_t hw_id) {m_hw_id = hw_id;} @@ -135,16 +137,16 @@ class CFlowStatUserIdInfo { private: bool m_rx_changed[TREX_MAX_PORTS]; // Which RX counters changed since we last published bool m_tx_changed[TREX_MAX_PORTS]; // Which TX counters changed since we last published - uint64_t m_rx_counter[TREX_MAX_PORTS]; // How many packets received with this user id since stream start + rx_per_flow_t m_rx_counter[TREX_MAX_PORTS]; // How many packets received with this user id since stream start // How many packets received with this user id, since stream creation, before stream start. - uint64_t m_rx_counter_base[TREX_MAX_PORTS]; + rx_per_flow_t m_rx_counter_base[TREX_MAX_PORTS]; tx_per_flow_t m_tx_counter[TREX_MAX_PORTS]; // How many packets transmitted with this user id since stream start // How many packets transmitted with this user id, since stream creation, before stream start. tx_per_flow_t m_tx_counter_base[TREX_MAX_PORTS]; uint16_t m_hw_id; // Associated hw id. UINT16_MAX if no associated hw id. uint8_t m_proto; // protocol (UDP, TCP, other), associated with this user id. - uint8_t m_ref_count; // How many streams with this ref count exists - uint8_t m_trans_ref_count; // How many streams with this ref count currently transmit + uint8_t m_ref_count; // How many streams with this user id exists + uint8_t m_trans_ref_count; // How many streams with this user id currently transmit bool m_was_sent; // Did we send this info to clients once? }; @@ -196,6 +198,7 @@ class CFlowStatRuleMgr { }; CFlowStatRuleMgr(); + ~CFlowStatRuleMgr(); friend std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf); int add_stream(const TrexStream * stream); int del_stream(const TrexStream * stream); @@ -205,8 +208,10 @@ class CFlowStatRuleMgr { bool dump_json(std::string & json, bool baseline); private: - int compile_stream(const TrexStream * stream, Cxl710Parser &parser); + void create(); + int compile_stream(const TrexStream * stream, CFlowStatParser *parser); int add_hw_rule(uint16_t hw_id, uint8_t proto); + void send_start_stop_msg_to_rx(bool is_start); private: CFlowStatHwIdMap m_hw_id_map; // map hw ids to user ids @@ -214,6 +219,10 @@ class CFlowStatRuleMgr { uint8_t m_num_ports; // How many ports are being used const TrexPlatformApi *m_api; int m_max_hw_id; // max hw id we ever used + uint32_t m_num_started_streams; // How many started (transmitting) streams we have + CNodeRing *m_ring_to_rx; // handle for sending messages to Rx core + CFlowStatParser *m_parser; + uint16_t m_capabilities; }; #endif diff --git a/src/flow_stat_parser.cpp b/src/flow_stat_parser.cpp index 52824f73..8a77c82d 100644 --- a/src/flow_stat_parser.cpp +++ b/src/flow_stat_parser.cpp @@ -25,38 +25,36 @@ #include <common/Network/Packet/EthernetHeader.h> #include <flow_stat_parser.h> -Cxl710Parser::Cxl710Parser() { - reset(); -} - -void Cxl710Parser::reset() { +void CFlowStatParser::reset() { m_ipv4 = 0; m_l4_proto = 0; - m_fdir_supported = false; + m_stat_supported = false; } -int Cxl710Parser::parse(uint8_t *p, uint16_t len) { +int CFlowStatParser::parse(uint8_t *p, uint16_t len) { EthernetHeader *ether = (EthernetHeader *)p; + reset(); + switch( ether->getNextProtocol() ) { case EthernetHeader::Protocol::IP : m_ipv4 = (IPHeader *)(p + 14); - m_fdir_supported = true; + m_stat_supported = true; break; case EthernetHeader::Protocol::VLAN : switch ( ether->getVlanProtocol() ){ case EthernetHeader::Protocol::IP: m_ipv4 = (IPHeader *)(p + 18); - m_fdir_supported = true; + m_stat_supported = true; break; default: - m_fdir_supported = false; + m_stat_supported = false; return -1; } break; default: - m_fdir_supported = false; + m_stat_supported = false; return -1; break; } @@ -64,7 +62,7 @@ int Cxl710Parser::parse(uint8_t *p, uint16_t len) { return 0; } -int Cxl710Parser::get_ip_id(uint16_t &ip_id) { +int CFlowStatParser::get_ip_id(uint16_t &ip_id) { if (! m_ipv4) return -1; @@ -73,7 +71,7 @@ int Cxl710Parser::get_ip_id(uint16_t &ip_id) { return 0; } -int Cxl710Parser::set_ip_id(uint16_t new_id) { +int CFlowStatParser::set_ip_id(uint16_t new_id) { if (! m_ipv4) return -1; @@ -84,7 +82,7 @@ int Cxl710Parser::set_ip_id(uint16_t new_id) { return 0; } -int Cxl710Parser::get_l4_proto(uint8_t &proto) { +int CFlowStatParser::get_l4_proto(uint8_t &proto) { if (! m_ipv4) return -1; @@ -96,7 +94,7 @@ int Cxl710Parser::get_l4_proto(uint8_t &proto) { static const uint16_t TEST_IP_ID = 0xabcd; static const uint8_t TEST_L4_PROTO = 0x11; -int Cxl710Parser::test() { +int CFlowStatParser::test() { uint16_t ip_id = 0; uint8_t l4_proto; uint8_t test_pkt[] = { @@ -124,14 +122,34 @@ int Cxl710Parser::test() { assert(m_ipv4->isChecksumOK() == true); assert(get_l4_proto(l4_proto) == 0); assert(l4_proto == TEST_L4_PROTO); - assert(m_fdir_supported == true); + assert(m_stat_supported == true); reset(); // bad packet test_pkt[16] = 0xaa; assert (parse(test_pkt, sizeof(test_pkt)) == -1); - assert(m_fdir_supported == false); + assert(m_stat_supported == false); + + return 0; +} + +// In 82599 10G card we do not support VLANs +int C82599Parser::parse(uint8_t *p, uint16_t len) { + EthernetHeader *ether = (EthernetHeader *)p; + + reset(); + + switch( ether->getNextProtocol() ) { + case EthernetHeader::Protocol::IP : + m_ipv4 = (IPHeader *)(p + 14); + m_stat_supported = true; + break; + default: + m_stat_supported = false; + return -1; + break; + } return 0; } diff --git a/src/flow_stat_parser.h b/src/flow_stat_parser.h index 606a1bec..8c9e1418 100644 --- a/src/flow_stat_parser.h +++ b/src/flow_stat_parser.h @@ -19,19 +19,33 @@ limitations under the License. */ -class Cxl710Parser { +#ifndef __FLOW_STAT_PARSER_H__ +#define __FLOW_STAT_PARSER_H__ + +// Basic flow stat parser. Relevant for xl710/x710/x350 cards +#include "common/Network/Packet/IPHeader.h" + +class CFlowStatParser { public: - Cxl710Parser(); - void reset(); - int parse(uint8_t *pkt, uint16_t len); - bool is_fdir_supported() {return m_fdir_supported == true;}; - int get_ip_id(uint16_t &ip_id); - int set_ip_id(uint16_t ip_id); - int get_l4_proto(uint8_t &proto); - int test(); + virtual ~CFlowStatParser() {}; + virtual void reset(); + virtual int parse(uint8_t *pkt, uint16_t len); + virtual bool is_stat_supported() {return m_stat_supported == true;}; + virtual int get_ip_id(uint16_t &ip_id); + virtual int set_ip_id(uint16_t ip_id); + virtual int get_l4_proto(uint8_t &proto); + virtual int test(); - private: + protected: IPHeader *m_ipv4; - bool m_fdir_supported; + bool m_stat_supported; uint8_t m_l4_proto; }; + +class C82599Parser : public CFlowStatParser { + public: + ~C82599Parser() {}; + int parse(uint8_t *pkt, uint16_t len); +}; + +#endif diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp index c3dfcb95..a5cf3307 100644 --- a/src/gtest/trex_stateless_gtest.cpp +++ b/src/gtest/trex_stateless_gtest.cpp @@ -3581,7 +3581,7 @@ class rx_stat_pkt_parse : public testing::Test { TEST_F(rx_stat_pkt_parse, x710_parser) { - Cxl710Parser parser; + CFlowStatParser parser; parser.test(); } diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h index f8f76584..90eaa7c7 100644 --- a/src/internal_api/trex_platform_api.h +++ b/src/internal_api/trex_platform_api.h @@ -26,6 +26,7 @@ limitations under the License. #include <vector> #include <string> #include <string.h> +#include "flow_stat_parser.h" #include "trex_defs.h" /** @@ -34,6 +35,7 @@ limitations under the License. * @author imarom (06-Oct-15) */ + class TrexPlatformGlobalStats { public: TrexPlatformGlobalStats() { @@ -42,7 +44,7 @@ public: struct { double m_cpu_util; - + double m_rx_cpu_util; double m_tx_bps; double m_rx_bps; @@ -142,7 +144,7 @@ public: virtual void publish_async_data_now(uint32_t key, bool baseline) const = 0; virtual uint8_t get_dp_core_count() const = 0; virtual void get_interface_stat_info(uint8_t interface_id, uint16_t &num_counters, uint16_t &capabilities) const =0; - virtual int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const = 0; + virtual int get_flow_stats(uint8_t port_id, void *stats, void *tx_stats, int min, int max, bool reset) const = 0; virtual int reset_hw_flow_stats(uint8_t port_id) const = 0; virtual void get_port_num(uint8_t &port_num) const = 0; virtual int add_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const = 0; @@ -151,6 +153,7 @@ public: virtual bool get_promiscuous(uint8_t port_id) const = 0; virtual void flush_dp_messages() const = 0; virtual int get_active_pgids(flow_stat_active_t &result) const = 0; + virtual CFlowStatParser *get_flow_stat_parser() const = 0; virtual ~TrexPlatformApi() {} }; @@ -171,7 +174,7 @@ public: void publish_async_data_now(uint32_t key, bool baseline) const; uint8_t get_dp_core_count() const; void get_interface_stat_info(uint8_t interface_id, uint16_t &num_counters, uint16_t &capabilities) const; - int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const; + int get_flow_stats(uint8_t port_id, void *stats, void *tx_stats, int min, int max, bool reset) const; int reset_hw_flow_stats(uint8_t port_id) const; void get_port_num(uint8_t &port_num) const; int add_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const; @@ -180,6 +183,7 @@ public: bool get_promiscuous(uint8_t port_id) const; void flush_dp_messages() const; int get_active_pgids(flow_stat_active_t &result) const; + CFlowStatParser *get_flow_stat_parser() const; }; @@ -225,7 +229,7 @@ public: virtual void publish_async_data_now(uint32_t key, bool baseline) const { } - virtual int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const {return 0;}; + virtual int get_flow_stats(uint8_t port_id, void *stats, void *tx_stats, int min, int max, bool reset) const {return 0;}; virtual int reset_hw_flow_stats(uint8_t port_id) const {return 0;}; virtual void get_port_num(uint8_t &port_num) const {port_num = 2;}; virtual int add_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const {return 0;} @@ -241,6 +245,7 @@ public: void flush_dp_messages() const { } int get_active_pgids(flow_stat_active_t &result) const {return 0;} + CFlowStatParser *get_flow_stat_parser() const {return new CFlowStatParser();} private: int m_dp_core_count; diff --git a/src/latency.cpp b/src/latency.cpp index d57e97c8..fff7935d 100644 --- a/src/latency.cpp +++ b/src/latency.cpp @@ -177,6 +177,9 @@ void CCPortLatency::reset(){ m_seq_error=0; m_length_error=0; m_no_ipv4_option=0; + for (int i = 0; i < MAX_FLOW_STATS; i++) { + m_rx_pg_stat[i].clear(); + } m_hist.Reset(); } @@ -628,8 +631,8 @@ void CLatencyManager::handle_rx_pkt(CLatencyManagerPerPort * lp, rte_pktmbuf_free(m); } -void CLatencyManager::handle_latency_pkt_msg(uint8_t thread_id, - CGenNodeLatencyPktInfo * msg){ +// In VM, we receive the RX packets in DP core, and send message to RX core with the packet +void CLatencyManager::handle_latency_pkt_msg(uint8_t thread_id, CGenNodeLatencyPktInfo * msg) { assert(msg->m_latency_offset==0xdead); @@ -666,6 +669,7 @@ void CLatencyManager::run_rx_queue_msgs(uint8_t thread_id, } } +// VM mode function. Handle messages from DP void CLatencyManager::try_rx_queues(){ CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp(); @@ -679,7 +683,6 @@ void CLatencyManager::try_rx_queues(){ } } - void CLatencyManager::try_rx(){ rte_mbuf_t * rx_pkts[64]; int i; @@ -712,7 +715,7 @@ void CLatencyManager::reset(){ } -void CLatencyManager::start(int iter){ +void CLatencyManager::start(int iter) { m_do_stop =false; m_is_active =false; int cnt=0; diff --git a/src/latency.h b/src/latency.h index 1f8ef5c0..3dd1cc36 100644 --- a/src/latency.h +++ b/src/latency.h @@ -86,6 +86,7 @@ public: bool Parse(); uint8_t getTTl(); + uint16_t getIpId(); uint16_t getPktSize(); // Check if packet contains latency data @@ -243,9 +244,8 @@ public: uint64_t m_seq_error; uint64_t m_rx_check; uint64_t m_no_ipv4_option; - - uint64_t m_length_error; + rx_per_flow_t m_rx_pg_stat[MAX_FLOW_STATS]; CTimeHistogram m_hist; /* all window */ CJitter m_jitter; }; diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 1b750bbd..46e9a95e 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -58,6 +58,7 @@ #include "stateless/cp/trex_stateless.h" #include "stateless/dp/trex_stream_node.h" #include "stateless/messaging/trex_stateless_messaging.h" +#include "stateless/rx/trex_stateless_rx_core.h" #include "publisher/trex_publisher.h" #include "../linux_dpdk/version.h" extern "C" { @@ -106,7 +107,7 @@ static inline int get_vm_one_queue_enable(){ } static inline int get_is_rx_thread_enabled() { - return (CGlobalInfo::m_options.is_rx_enabled() ?1:0); + return ((CGlobalInfo::m_options.is_rx_enabled() || CGlobalInfo::m_options.is_stateless()) ?1:0); } struct port_cfg_t; @@ -140,10 +141,14 @@ public: virtual int wait_for_stable_link()=0; virtual void wait_after_link_up(){}; virtual bool flow_control_disable_supported(){return true;} - virtual int get_rx_stats(CPhyEthIF * _if, uint32_t *stats, uint32_t *prev_stats, int min, int max) {return -1;} + virtual bool hw_rx_stat_supported(){return false;} + virtual int get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts, uint32_t *bytes, uint32_t *prev_bytes + , int min, int max) {return -1;} + virtual int reset_rx_stats(CPhyEthIF * _if, uint32_t *stats) {return 0;} virtual int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd) { return -1;} virtual int get_stat_counters_num() {return 0;} virtual int get_rx_stat_capabilities() {return 0;} + virtual CFlowStatParser *get_flow_stat_parser(); }; @@ -174,8 +179,8 @@ public: virtual int configure_drop_queue(CPhyEthIF * _if); virtual int configure_rx_filter_rules(CPhyEthIF * _if); - int configure_rx_filter_rules_statefull(CPhyEthIF * _if); - int configure_rx_filter_rules_stateless(CPhyEthIF * _if); + virtual int configure_rx_filter_rules_statefull(CPhyEthIF * _if); + virtual int configure_rx_filter_rules_stateless(CPhyEthIF * _if); virtual bool is_hardware_support_drop_queue(){ return(true); @@ -184,9 +189,11 @@ public: virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats); virtual void clear_extended_stats(CPhyEthIF * _if); - + virtual int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd) {return 0;} + virtual int get_stat_counters_num() {return MAX_FLOW_STATS;} + virtual int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;} virtual int wait_for_stable_link(); - void wait_after_link_up(); + virtual void wait_after_link_up(); }; class CTRexExtendedDriverBase1GVm : public CTRexExtendedDriverBase { @@ -236,6 +243,8 @@ public: virtual void clear_extended_stats(CPhyEthIF * _if); virtual int wait_for_stable_link(); + virtual int get_stat_counters_num() {return MAX_FLOW_STATS;} + virtual int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;} }; @@ -262,15 +271,18 @@ public: virtual bool is_hardware_filter_is_supported(){ return (true); } - virtual int configure_rx_filter_rules(CPhyEthIF * _if); - + virtual int configure_rx_filter_rules_stateless(CPhyEthIF * _if); + virtual int configure_rx_filter_rules_statefull(CPhyEthIF * _if); virtual bool is_hardware_support_drop_queue(){ return(true); } virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats); virtual void clear_extended_stats(CPhyEthIF * _if); virtual int wait_for_stable_link(); + virtual int get_stat_counters_num() {return MAX_FLOW_STATS;} + virtual int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;} + virtual CFlowStatParser *get_flow_stat_parser(); }; class CTRexExtendedDriverBase40G : public CTRexExtendedDriverBase10G { @@ -313,16 +325,21 @@ public: } virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats); virtual void clear_extended_stats(CPhyEthIF * _if); - int get_rx_stats(CPhyEthIF * _if, uint32_t *stats, uint32_t *prev_stats, int min, int max); - int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd); - int get_stat_counters_num() {return MAX_FLOW_STATS;} - int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;} + virtual int reset_rx_stats(CPhyEthIF * _if, uint32_t *stats); + virtual int get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts, uint32_t *bytes, uint32_t *prev_bytes, int min, int max); + virtual int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd); + virtual int get_stat_counters_num() {return MAX_FLOW_STATS;} + virtual int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;} virtual int wait_for_stable_link(); // disabling flow control on 40G using DPDK API causes the interface to malfunction - bool flow_control_disable_supported(){return false;} + virtual bool flow_control_disable_supported(){return false;} + virtual bool hw_rx_stat_supported(){return true;} + virtual CFlowStatParser *get_flow_stat_parser(); + private: - void add_del_rules(enum rte_filter_op op, uint8_t port_id, uint16_t type, uint8_t ttl, uint16_t ip_id, int queue, uint16_t stat_idx); + virtual void add_del_rules(enum rte_filter_op op, uint8_t port_id, uint16_t type, uint8_t ttl, uint16_t ip_id, int queue, uint16_t stat_idx); virtual int configure_rx_filter_rules_statfull(CPhyEthIF * _if); + private: uint8_t m_if_per_card; }; @@ -1019,11 +1036,11 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t } if ( (po->is_latency_enabled()) || (po->preview.getOnlyLatency()) ){ - parse_err("Latecny check is not supported with interactive mode "); + parse_err("Latency check is not supported with interactive mode "); } if ( po->preview.getSingleCore() ){ - parse_err("single core is not supported with interactive mode "); + parse_err("Single core is not supported with interactive mode "); } } @@ -1096,15 +1113,19 @@ public: m_port_conf.fdir_conf.status=RTE_FDIR_NO_REPORT_STATUS; /* Offset of flexbytes field in RX packets (in 16-bit word units). */ /* Note: divide by 2 to convert byte offset to word offset */ - if ( CGlobalInfo::m_options.preview.get_ipv6_mode_enable() ){ - m_port_conf.fdir_conf.flexbytes_offset=(14+6)/2; - }else{ - m_port_conf.fdir_conf.flexbytes_offset=(14+8)/2; - } + if (get_is_stateless()) { + m_port_conf.fdir_conf.flexbytes_offset = (14+4)/2; + } else { + if ( CGlobalInfo::m_options.preview.get_ipv6_mode_enable() ) { + m_port_conf.fdir_conf.flexbytes_offset = (14+6)/2; + } else { + m_port_conf.fdir_conf.flexbytes_offset = (14+8)/2; + } - /* Increment offset 4 bytes for the case where we add VLAN */ - if ( CGlobalInfo::m_options.preview.get_vlan_mode_enable() ){ - m_port_conf.fdir_conf.flexbytes_offset+=(4/2); + /* Increment offset 4 bytes for the case where we add VLAN */ + if ( CGlobalInfo::m_options.preview.get_vlan_mode_enable() ) { + m_port_conf.fdir_conf.flexbytes_offset += (4/2); + } } m_port_conf.fdir_conf.drop_queue=1; } @@ -1180,7 +1201,8 @@ void CPhyEthIFStats::Clear(){ oerrors = 0; imcasts = 0; rx_nombuf = 0; - memset(m_rx_per_flow, 0, sizeof(m_rx_per_flow)); + memset(m_rx_per_flow_pkts, 0, sizeof(m_rx_per_flow_pkts)); + memset(m_rx_per_flow_bytes, 0, sizeof(m_rx_per_flow_bytes)); } @@ -1214,6 +1236,7 @@ void CPhyEthIFStats::Dump(FILE *fd){ DP_A(rx_nombuf); } +// Clear the RX queue of an interface, dropping all packets void CPhyEthIF::flush_rx_queue(void){ rte_mbuf_t * rx_pkts[32]; @@ -1786,6 +1809,9 @@ bool CCoreEthIF::Create(uint8_t core_id, return (true); } +// This function is only relevant if we are in VM. In this case, we only have one rx queue. Can't have +// rules to drop queue 0, and pass queue 1 to RX core, like in other cases. +// We receive all packets in the same core that transmitted, and handle them to RX core. void CCoreEthIF::flush_rx_queue(void){ pkt_dir_t dir ; bool is_rx = get_is_rx_thread_enabled(); @@ -2300,6 +2326,7 @@ public: float m_active_flows; float m_open_flows; float m_cpu_util; + float m_rx_cpu_util; uint8_t m_threads; uint32_t m_num_of_ports; @@ -2588,15 +2615,17 @@ public: int queues_prob_init(); int ixgbe_start(); int ixgbe_rx_queue_flush(); - int ixgbe_configure_mg(); + void ixgbe_configure_mg(); + void rx_sl_configure(); bool is_all_links_are_up(bool dump=false); int reset_counters(); private: - /* try to stop all datapath cores */ - void try_stop_all_dp(); + /* try to stop all datapath cores and RX core */ + void try_stop_all_cores(); /* send message to all dp cores */ int send_message_all_dp(TrexStatelessCpToDpMsgBase *msg); + int send_message_to_rx(TrexStatelessCpToRxMsgBase *msg); void check_for_dp_message_from_core(int thread_id); public: @@ -2604,7 +2633,6 @@ public: int start_master_statefull(); int start_master_stateless(); int run_in_core(virtual_thread_id_t virt_core_id); - int stop_core(virtual_thread_id_t virt_core_id); int core_for_rx(){ if ( (! get_is_rx_thread_enabled()) ) { return -1; @@ -2675,8 +2703,10 @@ public: CParserOption m_po ; CFlowGenList m_fl; bool m_fl_was_init; - volatile uint8_t m_signal[BP_MAX_CORES] __rte_cache_aligned ; - CLatencyManager m_mg; + volatile uint8_t m_signal[BP_MAX_CORES] __rte_cache_aligned ; // Signal to main core when DP thread finished + volatile bool m_rx_running; // Signal main core when RX thread finished + CLatencyManager m_mg; // statefull RX core + CRxCoreStateless m_rx_sl; // stateless RX core CTrexGlobalIoMode m_io_modes; private: @@ -2763,12 +2793,14 @@ bool CGlobalTRex::is_all_links_are_up(bool dump){ return (all_link_are); } +void CGlobalTRex::try_stop_all_cores(){ -void CGlobalTRex::try_stop_all_dp(){ - - TrexStatelessDpQuit * msg= new TrexStatelessDpQuit(); - send_message_all_dp(msg); - delete msg; + TrexStatelessDpQuit * dp_msg= new TrexStatelessDpQuit(); + TrexStatelessRxQuit * rx_msg= new TrexStatelessRxQuit(); + send_message_all_dp(dp_msg); + send_message_to_rx(rx_msg); + delete dp_msg; + // no need to delete rx_msg. Deleted by receiver bool all_core_finished = false; int i; for (i=0; i<20; i++) { @@ -2799,6 +2831,13 @@ int CGlobalTRex::send_message_all_dp(TrexStatelessCpToDpMsgBase *msg){ return (0); } +int CGlobalTRex::send_message_to_rx(TrexStatelessCpToRxMsgBase *msg) { + CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); + ring->Enqueue((CGenNode *) msg); + + return (0); +} + int CGlobalTRex::ixgbe_rx_queue_flush(){ int i; @@ -2810,7 +2849,7 @@ int CGlobalTRex::ixgbe_rx_queue_flush(){ } -int CGlobalTRex::ixgbe_configure_mg(void){ +void CGlobalTRex::ixgbe_configure_mg(void) { int i; CLatencyManagerCfg mg_cfg; mg_cfg.m_max_ports = m_max_ports; @@ -2850,10 +2889,34 @@ int CGlobalTRex::ixgbe_configure_mg(void){ m_mg.Create(&mg_cfg); m_mg.set_mask(CGlobalInfo::m_options.m_latency_mask); - - return (0); } +// init m_rx_sl object for stateless rx core +void CGlobalTRex::rx_sl_configure(void) { + CRxSlCfg rx_sl_cfg; + int i; + + rx_sl_cfg.m_max_ports = m_max_ports; + + if ( get_vm_one_queue_enable() ) { + /* vm mode, indirect queues */ + for (i=0; i < m_max_ports; i++) { + CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp(); + uint8_t thread_id = (i >> 1); + CNodeRing * r = rx_dp->getRingCpToDp(thread_id); + m_latency_vm_vports[i].Create((uint8_t)i, r, &m_mg); + rx_sl_cfg.m_ports[i] = &m_latency_vm_vports[i]; + } + } else { + for (i = 0; i < m_max_ports; i++) { + CPhyEthIF * _if = &m_ports[i]; + m_latency_vports[i].Create(_if, m_latency_tx_queue_id, 1); + rx_sl_cfg.m_ports[i] = &m_latency_vports[i]; + } + } + + m_rx_sl.create(rx_sl_cfg); +} int CGlobalTRex::ixgbe_start(void){ int i; @@ -2971,8 +3034,11 @@ int CGlobalTRex::ixgbe_start(void){ ixgbe_rx_queue_flush(); - - ixgbe_configure_mg(); + if (! get_is_stateless()) { + ixgbe_configure_mg(); + } else { + rx_sl_configure(); + } /* core 0 - control @@ -3361,6 +3427,9 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){ stats.m_num_of_ports = m_max_ports; stats.m_cpu_util = m_fl.GetCpuUtil(); + if (get_is_stateless()) { + stats.m_rx_cpu_util = m_rx_sl.get_cpu_util(); + } stats.m_threads = m_fl.m_threads_info.size(); for (i=0; i<m_max_ports; i++) { @@ -3724,7 +3793,7 @@ int CGlobalTRex::run_in_master() { if (!is_all_cores_finished()) { /* probably CLTR-C */ - try_stop_all_dp(); + try_stop_all_cores(); } m_mg.stop(); @@ -3739,16 +3808,17 @@ int CGlobalTRex::run_in_master() { int CGlobalTRex::run_in_rx_core(void){ - if ( CGlobalInfo::m_options.is_rx_enabled() ){ - m_mg.start(0); + if (get_is_stateless()) { + m_rx_running = true; + m_rx_sl.start(); + } else { + if ( CGlobalInfo::m_options.is_rx_enabled() ){ + m_rx_running = true; + m_mg.start(0); + } } - // ??? start stateless rx - return (0); -} - -int CGlobalTRex::stop_core(virtual_thread_id_t virt_core_id){ - m_signal[virt_core_id]=1; + m_rx_running = false; return (0); } @@ -3833,14 +3903,17 @@ int CGlobalTRex::stop_master(){ return (0); } -bool CGlobalTRex::is_all_cores_finished(){ +bool CGlobalTRex::is_all_cores_finished() { int i; for (i=0; i<get_cores_tx(); i++) { if ( m_signal[i+1]==0){ - return (false); + return false; } } - return (true); + if (m_rx_running) + return false; + + return true; } @@ -3926,48 +3999,60 @@ int CGlobalTRex::start_master_statefull() { //////////////////////////////////////////// - static CGlobalTRex g_trex; -// The HW counters start from some random values. The driver give us the diffs from previous, -// each time we do get_rx_stats. We need to make one first call, at system startup, -// and ignore the returned diffs int CPhyEthIF::reset_hw_flow_stats() { - uint32_t diff_stats[MAX_FLOW_STATS]; - - if (get_ex_drv()->get_rx_stats(this, diff_stats, m_stats.m_fdir_prev_stats, 0, MAX_FLOW_STATS - 1) < 0) { - return -1; + if (get_ex_drv()->hw_rx_stat_supported()) { + if (get_ex_drv()->reset_rx_stats(this, m_stats.m_fdir_prev_pkts) < 0) { + return -1; + } + } else { + g_trex.m_rx_sl.reset_rx_stats(get_port_id()); } - return 0; } // get/reset flow director counters // return 0 if OK. -1 if operation not supported. -// rx_stats, tx_stats - arrays of len max - min + 1. Returning rx, tx updated values. +// rx_stats, tx_stats - arrays of len max - min + 1. Returning rx, tx updated absolute values. // min, max - minimum, maximum counters range to get // reset - If true, need to reset counter value after reading -int CPhyEthIF::get_flow_stats(uint64_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset) { - uint32_t diff_stats[MAX_FLOW_STATS]; - - if (get_ex_drv()->get_rx_stats(this, diff_stats, m_stats.m_fdir_prev_stats, min, max) < 0) { - return -1; +int CPhyEthIF::get_flow_stats(rx_per_flow_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset) { + uint32_t diff_pkts[MAX_FLOW_STATS]; + uint32_t diff_bytes[MAX_FLOW_STATS]; + bool hw_rx_stat_supported = get_ex_drv()->hw_rx_stat_supported(); + + if (hw_rx_stat_supported) { + if (get_ex_drv()->get_rx_stats(this, diff_pkts, m_stats.m_fdir_prev_pkts + , diff_bytes, m_stats.m_fdir_prev_bytes, min, max) < 0) { + return -1; + } + } else { + g_trex.m_rx_sl.get_rx_stats(get_port_id(), rx_stats, min, max, reset); } for (int i = min; i <= max; i++) { if ( reset ) { // return value so far, and reset - if (rx_stats != NULL) { - rx_stats[i - min] = m_stats.m_rx_per_flow[i] + diff_stats[i]; + if (hw_rx_stat_supported) { + if (rx_stats != NULL) { + rx_stats[i - min].set_pkts(m_stats.m_rx_per_flow_pkts[i] + diff_pkts[i]); + rx_stats[i - min].set_bytes(m_stats.m_rx_per_flow_bytes[i] + diff_bytes[i]); + } + m_stats.m_rx_per_flow_pkts[i] = 0; + m_stats.m_rx_per_flow_bytes[i] = 0; } if (tx_stats != NULL) { tx_stats[i - min] = g_trex.clear_flow_tx_stats(m_port_id, i); } - m_stats.m_rx_per_flow[i] = 0; } else { - m_stats.m_rx_per_flow[i] += diff_stats[i]; - if (rx_stats != NULL) { - rx_stats[i - min] = m_stats.m_rx_per_flow[i]; + if (hw_rx_stat_supported) { + m_stats.m_rx_per_flow_pkts[i] += diff_pkts[i]; + m_stats.m_rx_per_flow_bytes[i] += diff_bytes[i]; + if (rx_stats != NULL) { + rx_stats[i - min].set_pkts(m_stats.m_rx_per_flow_pkts[i]); + rx_stats[i - min].set_bytes(m_stats.m_rx_per_flow_bytes[i]); + } } if (tx_stats != NULL) { tx_stats[i - min] = g_trex.get_flow_tx_stats(m_port_id, i); @@ -3978,6 +4063,8 @@ int CPhyEthIF::get_flow_stats(uint64_t *rx_stats, tx_per_flow_t *tx_stats, int m return 0; } +// If needed, send packets to rx core for processing. +// This is relevant only in VM case, where we receive packets to the working DP core (only 1 DP core in this case) bool CCoreEthIF::process_rx_pkt(pkt_dir_t dir, rte_mbuf_t * m){ @@ -3986,17 +4073,25 @@ bool CCoreEthIF::process_rx_pkt(pkt_dir_t dir, return false; } bool send=false; - CLatencyPktMode *c_l_pkt_mode = g_trex.m_mg.c_l_pkt_mode; - bool is_lateancy_pkt = c_l_pkt_mode->IsLatencyPkt(parser.m_ipv4) & parser.IsLatencyPkt(parser.m_l4 + c_l_pkt_mode->l4_header_len()); - if (is_lateancy_pkt){ - send=true; - }else{ - if ( get_is_rx_filter_enable() ){ - uint8_t max_ttl = 0xff - get_rx_check_hops(); - uint8_t pkt_ttl = parser.getTTl(); - if ( (pkt_ttl==max_ttl) || (pkt_ttl==(max_ttl-1) ) ) { - send=true; + if ( get_is_stateless() ) { + // In stateless RX, we only care about flow stat packets + if ((parser.getIpId() & 0xff00) == IP_ID_RESERVE_BASE) { + send = true; + } + } else { + CLatencyPktMode *c_l_pkt_mode = g_trex.m_mg.c_l_pkt_mode; + bool is_lateancy_pkt = c_l_pkt_mode->IsLatencyPkt(parser.m_ipv4) & parser.IsLatencyPkt(parser.m_l4 + c_l_pkt_mode->l4_header_len()); + + if (is_lateancy_pkt) { + send = true; + } else { + if ( get_is_rx_filter_enable() ) { + uint8_t max_ttl = 0xff - get_rx_check_hops(); + uint8_t pkt_ttl = parser.getTTl(); + if ( (pkt_ttl==max_ttl) || (pkt_ttl==(max_ttl-1) ) ) { + send=true; + } } } } @@ -4036,7 +4131,6 @@ static int latency_one_lcore(__attribute__((unused)) void *dummy) CPlatformSocketInfo * lpsock=&CGlobalInfo::m_socket; physical_thread_id_t phy_id =rte_lcore_id(); - if ( lpsock->thread_phy_is_rx(phy_id) ) { g_trex.run_in_rx_core(); }else{ @@ -4060,7 +4154,6 @@ static int slave_one_lcore(__attribute__((unused)) void *dummy) CPlatformSocketInfo * lpsock=&CGlobalInfo::m_socket; physical_thread_id_t phy_id =rte_lcore_id(); - if ( lpsock->thread_phy_is_rx(phy_id) ) { g_trex.run_in_rx_core(); }else{ @@ -4387,7 +4480,7 @@ int main_test(int argc , char * argv[]){ && (CGlobalInfo::m_options.m_latency_prev > 0)) { uint32_t pkts = CGlobalInfo::m_options.m_latency_prev * CGlobalInfo::m_options.m_latency_rate; - printf("Start prev latency check- for %d sec \n",CGlobalInfo::m_options.m_latency_prev); + printf("Starting pre latency check for %d sec\n",CGlobalInfo::m_options.m_latency_prev); g_trex.m_mg.start(pkts); delay(CGlobalInfo::m_options.m_latency_prev* 1000); printf("Finished \n"); @@ -4395,6 +4488,7 @@ int main_test(int argc , char * argv[]){ g_trex.reset_counters(); } + g_trex.m_rx_running = false; if ( get_is_stateless() ) { g_trex.start_master_stateless(); @@ -4448,6 +4542,12 @@ int CTRexExtendedDriverBase::configure_drop_queue(CPhyEthIF * _if) { return (rte_eth_dev_rx_queue_stop(port_id, 0)); } +CFlowStatParser *CTRexExtendedDriverBase::get_flow_stat_parser() { + CFlowStatParser *parser = new CFlowStatParser(); + assert (parser); + return parser; +} + void wait_x_sec(int sec) { int i; printf(" wait %d sec ", sec); @@ -4610,7 +4710,7 @@ int CTRexExtendedDriverBase1G::configure_rx_filter_rules_stateless(CPhyEthIF * _ } rule_id = 0; - // filter for byte 18 of packet (lsb of IP ID) should equal ff + // filter for byte 18 of packet (msb of IP ID) should equal ff _if->pci_reg_write( (E1000_FHFT(rule_id)+(2*16)) , 0x00ff0000); _if->pci_reg_write( (E1000_FHFT(rule_id)+(2*16) + 8) , 0x04); /* MASK */ // + bytes 12 + 13 (ether type) should indicate IP. @@ -4682,7 +4782,13 @@ void CTRexExtendedDriverBase1G::get_extended_stats(CPhyEthIF * _if,CPhyEthIFStat void CTRexExtendedDriverBase1G::clear_extended_stats(CPhyEthIF * _if){ } - +#if 0 +int CTRexExtendedDriverBase1G::get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts + ,uint32_t *bytes, uint32_t *prev_bytes, int min, int max) { + uint32_t port_id = _if->get_port_id(); + return g_trex.m_rx_sl.get_rx_stats(port_id, pkts, prev_pkts, bytes, prev_bytes, min, max); +} +#endif void CTRexExtendedDriverBase10G::clear_extended_stats(CPhyEthIF * _if){ _if->pci_reg_read(IXGBE_RXNFGPC); @@ -4698,7 +4804,43 @@ void CTRexExtendedDriverBase10G::update_configuration(port_cfg_t * cfg){ cfg->m_tx_conf.tx_thresh.wthresh = TX_WTHRESH; } -int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if){ +int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if) { + if ( get_is_stateless() ) { + return configure_rx_filter_rules_stateless(_if); + } else { + return configure_rx_filter_rules_statefull(_if); + } + + return 0; +} + +int CTRexExtendedDriverBase10G::configure_rx_filter_rules_stateless(CPhyEthIF * _if) { + uint8_t port_id = _if->get_rte_port_id(); + int ip_id_lsb; + + for (ip_id_lsb = 0; ip_id_lsb < MAX_FLOW_STATS; ip_id_lsb++ ) { + struct rte_eth_fdir_filter fdir_filter; + int res = 0; + + memset(&fdir_filter,0,sizeof(fdir_filter)); + fdir_filter.input.flow_type = RTE_ETH_FLOW_NONFRAG_IPV4_OTHER; + fdir_filter.soft_id = ip_id_lsb; // We can use the ip_id_lsb also as filter soft_id + fdir_filter.input.flow_ext.flexbytes[0] = 0xff; + fdir_filter.input.flow_ext.flexbytes[1] = ip_id_lsb; + fdir_filter.action.rx_queue = 1; + fdir_filter.action.behavior = RTE_ETH_FDIR_ACCEPT; + fdir_filter.action.report_status = RTE_ETH_FDIR_NO_REPORT_STATUS; + res = rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_FDIR, RTE_ETH_FILTER_ADD, &fdir_filter); + + if (res != 0) { + rte_exit(EXIT_FAILURE, " ERROR rte_eth_dev_filter_ctrl : %d\n",res); + } + } + + return 0; +} + +int CTRexExtendedDriverBase10G::configure_rx_filter_rules_statefull(CPhyEthIF * _if) { uint8_t port_id=_if->get_rte_port_id(); uint16_t hops = get_rx_check_hops(); uint16_t v4_hops = (hops << 8)&0xff00; @@ -4809,6 +4951,12 @@ int CTRexExtendedDriverBase10G::wait_for_stable_link(){ return (0); } +CFlowStatParser *CTRexExtendedDriverBase10G::get_flow_stat_parser() { + CFlowStatParser *parser = new C82599Parser(); + assert (parser); + return parser; +} + //////////////////////////////////////////////////////////////////////////////// void CTRexExtendedDriverBase40G::clear_extended_stats(CPhyEthIF * _if){ rte_eth_stats_reset(_if->get_port_id()); @@ -4939,13 +5087,24 @@ int CTRexExtendedDriverBase40G::configure_rx_filter_rules(CPhyEthIF * _if) { } } +int CTRexExtendedDriverBase40G::reset_rx_stats(CPhyEthIF * _if, uint32_t *stats) { + uint32_t diff_stats[MAX_FLOW_STATS]; + + // The HW counters start from some random values. The driver give us the diffs from previous, + // each time we do get_rx_stats. We need to make one first call, at system startup, + // and ignore the returned diffs + return get_rx_stats(_if, diff_stats, stats, NULL, NULL, 0, MAX_FLOW_STATS - 1); +} + // instead of adding this to rte_ethdev.h extern "C" int rte_eth_fdir_stats_get(uint8_t port_id, uint32_t *stats, uint32_t start, uint32_t len); // get rx stats on _if, between min and max -// prev_stats should be the previous values read from the hardware. +// prev_pkts should be the previous values read from the hardware. // Getting changed to be equal to current HW values. -// stats return the diff between prev_stats and current hw values -int CTRexExtendedDriverBase40G::get_rx_stats(CPhyEthIF * _if, uint32_t *stats, uint32_t *prev_stats, int min, int max) { +// pkts return the diff between prev_pkts and current hw values +// bytes and prev_bytes are not used. X710 fdir filters do not support byte count. +int CTRexExtendedDriverBase40G::get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts + ,uint32_t *bytes, uint32_t *prev_bytes, int min, int max) { uint32_t hw_stats[MAX_FLOW_STATS]; uint32_t port_id = _if->get_port_id(); uint32_t start = (port_id % m_if_per_card) * MAX_FLOW_STATS + min; @@ -4954,13 +5113,13 @@ int CTRexExtendedDriverBase40G::get_rx_stats(CPhyEthIF * _if, uint32_t *stats, u rte_eth_fdir_stats_get(port_id, hw_stats, start, len); for (int i = loop_start; i < loop_start + len; i++) { - if (hw_stats[i - min] >= prev_stats[i]) { - stats[i] = (uint64_t)(hw_stats[i - min] - prev_stats[i]); + if (hw_stats[i - min] >= prev_pkts[i]) { + pkts[i] = (uint64_t)(hw_stats[i - min] - prev_pkts[i]); } else { // Wrap around - stats[i] = (uint64_t)((hw_stats[i - min] + ((uint64_t)1 << 32)) - prev_stats[i]); + pkts[i] = (uint64_t)((hw_stats[i - min] + ((uint64_t)1 << 32)) - prev_pkts[i]); } - prev_stats[i] = hw_stats[i - min]; + prev_pkts[i] = hw_stats[i - min]; } return 0; @@ -5025,6 +5184,12 @@ int CTRexExtendedDriverBase40G::wait_for_stable_link(){ return (0); } +CFlowStatParser *CTRexExtendedDriverBase40G::get_flow_stat_parser() { + CFlowStatParser *parser = new CFlowStatParser(); + assert (parser); + return parser; +} + ///////////////////////////////////////////////////////////////////// @@ -5144,6 +5309,9 @@ TrexDpdkPlatformApi::get_global_stats(TrexPlatformGlobalStats &stats) const { g_trex.get_stats(trex_stats); stats.m_stats.m_cpu_util = trex_stats.m_cpu_util; + if (get_is_stateless()) { + stats.m_stats.m_rx_cpu_util = trex_stats.m_rx_cpu_util; + } stats.m_stats.m_tx_bps = trex_stats.m_tx_bps; stats.m_stats.m_tx_pps = trex_stats.m_tx_pps; @@ -5197,12 +5365,6 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id, intf_info_st &info /* hardware */ g_trex.m_ports[interface_id].macaddr_get(&rte_mac_addr); assert(ETHER_ADDR_LEN == 6); - printf("interface %d speed: %d mac:", interface_id, info.speed); - for (int i = 0; i < 6; i++) { - info.mac_info.hw_macaddr[i] = rte_mac_addr.addr_bytes[i]; - printf("%x:", rte_mac_addr.addr_bytes[i]); - } - printf("\n"); /* software */ uint8_t sw_macaddr[12]; @@ -5235,8 +5397,8 @@ TrexDpdkPlatformApi::get_interface_stat_info(uint8_t interface_id, uint16_t &num capabilities = CTRexExtendedDriverDb::Ins()->get_drv()->get_rx_stat_capabilities(); } -int TrexDpdkPlatformApi::get_flow_stats(uint8 port_id, uint64_t *rx_stats, void *tx_stats, int min, int max, bool reset) const { - return g_trex.m_ports[port_id].get_flow_stats(rx_stats, (tx_per_flow_t *)tx_stats, min, max, reset); +int TrexDpdkPlatformApi::get_flow_stats(uint8 port_id, void *rx_stats, void *tx_stats, int min, int max, bool reset) const { + return g_trex.m_ports[port_id].get_flow_stats((rx_per_flow_t *)rx_stats, (tx_per_flow_t *)tx_stats, min, max, reset); } int TrexDpdkPlatformApi::reset_hw_flow_stats(uint8_t port_id) const { @@ -5268,3 +5430,8 @@ void TrexDpdkPlatformApi::flush_dp_messages() const { int TrexDpdkPlatformApi::get_active_pgids(flow_stat_active_t &result) const { return g_trex.m_trex_stateless->m_rx_flow_stat.get_active_pgids(result); } + +CFlowStatParser *TrexDpdkPlatformApi::get_flow_stat_parser() const { + return CTRexExtendedDriverDb::Ins()->get_drv() + ->get_flow_stat_parser(); +} diff --git a/src/main_dpdk.h b/src/main_dpdk.h index a475d321..ff1ea784 100644 --- a/src/main_dpdk.h +++ b/src/main_dpdk.h @@ -38,9 +38,11 @@ class CPhyEthIFStats { uint64_t oerrors; /**< Total number of failed transmitted packets. */ uint64_t imcasts; /**< Total number of multicast received packets. */ uint64_t rx_nombuf; /**< Total number of RX mbuf allocation failures. */ - uint64_t m_rx_per_flow [MAX_FLOW_STATS]; // Per flow RX statistics - // Previous fdir stats values read from HW. Since on xl710 this is 32 bit, we save old value, to handle wrap around. - uint32_t m_fdir_prev_stats [MAX_FLOW_STATS]; + uint64_t m_rx_per_flow_pkts [MAX_FLOW_STATS]; // Per flow RX pkts + uint64_t m_rx_per_flow_bytes[MAX_FLOW_STATS]; // Per flow RX bytes + // Previous fdir stats values read from driver. Since on xl710 this is 32 bit, we save old value, to handle wrap around. + uint32_t m_fdir_prev_pkts [MAX_FLOW_STATS]; + uint32_t m_fdir_prev_bytes [MAX_FLOW_STATS]; public: void Clear(); void Dump(FILE *fd); @@ -73,7 +75,7 @@ class CPhyEthIF { void get_stats(CPhyEthIFStats *stats); int dump_fdir_global_stats(FILE *fd); int reset_hw_flow_stats(); - int get_flow_stats(uint64_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset); + int get_flow_stats(rx_per_flow_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset); void get_stats_1g(CPhyEthIFStats *stats); void rx_queue_setup(uint16_t rx_queue_id, uint16_t nb_rx_desc, diff --git a/src/msg_manager.cpp b/src/msg_manager.cpp index 9ade1bfc..7e39391a 100755 --- a/src/msg_manager.cpp +++ b/src/msg_manager.cpp @@ -4,7 +4,7 @@ */ /* -Copyright (c) 2015-2015 Cisco Systems, Inc. +Copyright (c) 2015-2016 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -65,12 +65,12 @@ void CMessagingManager::Delete(){ delete [] m_dp_to_cp; m_dp_to_cp = NULL; } - + if (m_cp_to_dp) { delete [] m_cp_to_dp; m_cp_to_dp = NULL; } - + } CNodeRing * CMessagingManager::getRingCpToDp(uint8_t thread_id){ @@ -84,7 +84,6 @@ CNodeRing * CMessagingManager::getRingDpToCp(uint8_t thread_id){ } - void CMsgIns::Free(){ if (m_ins) { m_ins->Delete(); @@ -107,6 +106,11 @@ bool CMsgIns::Create(uint8_t num_threads){ if (!res) { return (res); } + res = m_cp_rx.Create(1, "cp_rx"); + if (!res) { + return (res); + } + return (m_rx_dp.Create(num_threads,"rx_dp")); } @@ -114,9 +118,8 @@ bool CMsgIns::Create(uint8_t num_threads){ void CMsgIns::Delete(){ m_cp_dp.Delete(); m_rx_dp.Delete(); + m_cp_rx.Delete(); } -CMsgIns * CMsgIns::m_ins=0; - - +CMsgIns * CMsgIns::m_ins=0; diff --git a/src/msg_manager.h b/src/msg_manager.h index 0390ce10..de11edbd 100755 --- a/src/msg_manager.h +++ b/src/msg_manager.h @@ -6,7 +6,7 @@ */ /* -Copyright (c) 2015-2015 Cisco Systems, Inc. +Copyright (c) 2015-2016 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -40,37 +40,37 @@ public: /* -e.g DP with 4 threads -will look like this +e.g DP with 4 threads +will look like this - cp_to_dp + cp_to_dp master :push dpx : pop - + - --> dp0 cp - --> dp1 - --> dp2 - --> dp3 - dp_to_cp + dp_to_cp cp : pop dpx : push - + <- -- dp0 cp <- -- dp1 <- -- dp2 <- -- dp3 -*/ +*/ class CGenNode ; typedef CTRingSp<CGenNode> CNodeRing; -/* CP == latency thread +/* CP == latency thread DP == traffic pkt generator */ class CMessagingManager { public: @@ -83,6 +83,7 @@ public: void Delete(); CNodeRing * getRingCpToDp(uint8_t thread_id); CNodeRing * getRingDpToCp(uint8_t thread_id); + CNodeRing * getRingCpToRx(); uint8_t get_num_threads(){ return (m_num_dp_threads); } @@ -106,6 +107,9 @@ public: CMessagingManager * getCpDp(){ return (&m_cp_dp); } + CMessagingManager * getCpRx(){ + return (&m_cp_rx); + } uint8_t get_num_threads(){ return (m_rx_dp.get_num_threads()); @@ -114,11 +118,11 @@ public: private: CMessagingManager m_rx_dp; CMessagingManager m_cp_dp; - + CMessagingManager m_cp_rx; private: /* one instance */ - static CMsgIns * m_ins; + static CMsgIns * m_ins; }; #endif diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 9e24802b..9df57a50 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -132,6 +132,7 @@ TrexStateless::encode_stats(Json::Value &global) { api->get_global_stats(stats); global["cpu_util"] = stats.m_stats.m_cpu_util; + global["rx_cpu_util"] = stats.m_stats.m_rx_cpu_util; global["tx_bps"] = stats.m_stats.m_tx_bps; global["rx_bps"] = stats.m_stats.m_rx_bps; diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 5947aaf7..90589d7a 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -473,6 +473,13 @@ TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBas ring->Enqueue((CGenNode *)msg); } +void +TrexStatelessPort::send_message_to_rx(TrexStatelessCpToRxMsgBase *msg) { + + /* send the message to the core */ + CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); + ring->Enqueue((CGenNode *)msg); +} uint64_t TrexStatelessPort::get_port_speed_bps() const { diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index d3c4dcb9..7e1838d4 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -4,7 +4,7 @@ */ /* -Copyright (c) 2015-2015 Cisco Systems, Inc. +Copyright (c) 2015-2016 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,20 +21,21 @@ limitations under the License. #ifndef __TREX_STATELESS_PORT_H__ #define __TREX_STATELESS_PORT_H__ -#include <trex_stream.h> -#include <trex_dp_port_events.h> -#include <internal_api/trex_platform_api.h> +#include "internal_api/trex_platform_api.h" +#include "trex_dp_port_events.h" +#include "trex_stream.h" class TrexStatelessCpToDpMsgBase; +class TrexStatelessCpToRxMsgBase; class TrexStreamsGraphObj; class TrexPortMultiplier; -/** +/** * TRex port owner can perform * write commands * while port is owned - others can * do read only commands - * + * */ class TrexPortOwner { public: @@ -92,7 +93,7 @@ private: /* handler genereated internally */ std::string m_handler; - + /* seed for generating random values */ unsigned int m_seed; @@ -106,7 +107,7 @@ class AsyncStopEvent; /** * describes a stateless port - * + * * @author imarom (31-Aug-15) */ class TrexStatelessPort { @@ -137,9 +138,9 @@ public: RC_ERR_FAILED_TO_COMPILE_STREAMS }; - + TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api); - + ~TrexStatelessPort(); /** @@ -155,11 +156,11 @@ public: void release(void); /** - * validate the state of the port before start - * it will return a stream graph - * containing information about the streams - * configured on this port - * + * validate the state of the port before start + * it will return a stream graph + * containing information about the streams + * configured on this port + * * on error it throws TrexException */ const TrexStreamsGraphObj *validate(void); @@ -190,13 +191,13 @@ public: /** * update current traffic on port - * + * */ void update_traffic(const TrexPortMultiplier &mul, bool force); /** * get the port state - * + * */ port_state_e get_state() const { return m_port_state; @@ -204,23 +205,23 @@ public: /** * port state as string - * + * */ std::string get_state_as_string() const; /** * the the max stream id currently assigned - * + * */ int get_max_stream_id() const; /** * fill up properties of the port - * + * * @author imarom (16-Sep-15) - * - * @param driver - * @param speed + * + * @param driver + * @param speed */ void get_properties(std::string &driver, TrexPlatformApi::driver_speed_e &speed); @@ -237,7 +238,7 @@ public: /** * delegators - * + * */ void add_stream(TrexStream *stream); @@ -267,7 +268,7 @@ public: /** * returns the number of DP cores linked to this port - * + * */ uint8_t get_dp_core_count() { return m_cores_id_list.size(); @@ -275,7 +276,7 @@ public: /** * returns the traffic multiplier currently being used by the DP - * + * */ double get_multiplier() { return (m_factor); @@ -283,13 +284,13 @@ public: /** * get port speed in bits per second - * + * */ uint64_t get_port_speed_bps() const; /** * return RX caps - * + * */ int get_rx_caps() const { return m_rx_caps; @@ -300,12 +301,12 @@ public: } /** - * return true if port adds CRC to a packet (not occurs for - * VNICs) - * + * return true if port adds CRC to a packet (not occurs for + * VNICs) + * * @author imarom (24-Feb-16) - * - * @return bool + * + * @return bool */ bool has_crc_added() const { return m_api_info.has_crc; @@ -318,9 +319,9 @@ public: /** * get the port effective rate (on a started / paused port) - * + * * @author imarom (07-Jan-16) - * + * */ void get_port_effective_rate(double &pps, double &bps_L1, @@ -330,8 +331,8 @@ public: /** * set port promiscuous on/off - * - * @param enabled + * + * @param enabled */ void set_promiscuous(bool enabled); bool get_promiscuous(); @@ -357,40 +358,45 @@ private: /** * send message to all cores using duplicate - * + * */ void send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg); /** * send message to specific DP core - * + * */ void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg); + /** + * send message to specific RX core + * + */ + void send_message_to_rx(TrexStatelessCpToRxMsgBase *msg); /** * when a port stops, perform various actions - * + * */ void common_port_stop_actions(bool async); /** * calculate effective M per core - * + * */ double calculate_effective_factor(const TrexPortMultiplier &mul, bool force = false); double calculate_effective_factor_internal(const TrexPortMultiplier &mul); - + /** * generates a graph of streams graph - * + * */ void generate_streams_graph(); /** * dispose of it - * + * * @author imarom (26-Nov-15) */ void delete_streams_graph(); @@ -426,7 +432,7 @@ private: /** * port multiplier object - * + * */ class TrexPortMultiplier { public: @@ -443,8 +449,8 @@ public: }; /** - * multiplier can be absolute value - * increment value or subtract value + * multiplier can be absolute value + * increment value or subtract value */ enum mul_op_e { OP_ABS, diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp index be5002da..563236c2 100644 --- a/src/stateless/cp/trex_streams_compiler.cpp +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -477,7 +477,8 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream, TrexStream *fixed_rx_flow_stat_stream = stream->clone(true); - get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id); //???? check for errors + // not checking for errors. We assume that if add_stream succeeded, start_stream will too. + get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id); /* can this stream be split to many cores ? */ if (!stream->is_splitable(dp_core_count)) { diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index f8d6d828..ba25f61d 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -5,7 +5,7 @@ */ /* -Copyright (c) 2015-2015 Cisco Systems, Inc. +Copyright (c) 2015-2016 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,14 +19,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include <trex_stateless_dp_core.h> -#include <trex_stateless_messaging.h> -#include <trex_streams_compiler.h> -#include <trex_stream_node.h> -#include <trex_stream.h> - -#include <bp_sim.h> - +#include "bp_sim.h" +#include "trex_stateless_dp_core.h" +#include "trex_stateless_messaging.h" +#include "trex_stream.h" +#include "trex_stream_node.h" +#include "trex_streams_compiler.h" void CDpOneStream::Delete(CFlowGenListPerThread * core){ assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE); diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 333aec88..7edf0f13 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -5,7 +5,7 @@ */ /* -Copyright (c) 2015-2015 Cisco Systems, Inc. +Copyright (c) 2015-2016 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,17 +19,18 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include <trex_stateless_messaging.h> -#include <trex_stateless_dp_core.h> -#include <trex_streams_compiler.h> -#include <trex_stateless.h> -#include <bp_sim.h> - #include <string.h> +#include "trex_stateless_messaging.h" +#include "trex_stateless_dp_core.h" +#include "trex_stateless_rx_core.h" +#include "trex_streams_compiler.h" +#include "trex_stateless.h" +#include "bp_sim.h" + /************************* start traffic message - ************************/ + ************************/ TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexStreamsCompiledObj *obj, double duration) { m_port_id = port_id; m_event_id = event_id; @@ -40,7 +41,7 @@ TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexSt /** * clone for DP start message - * + * */ TrexStatelessCpToDpMsgBase * TrexStatelessDpStart::clone() { @@ -69,7 +70,7 @@ TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { /************************* stop traffic message - ************************/ + ************************/ bool TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) { @@ -114,7 +115,7 @@ bool TrexStatelessDpResume::handle(TrexStatelessDpCore *dp_core){ /** * clone for DP stop message - * + * */ TrexStatelessCpToDpMsgBase * TrexStatelessDpStop::clone() { @@ -130,7 +131,7 @@ TrexStatelessDpStop::clone() { -TrexStatelessCpToDpMsgBase * +TrexStatelessCpToDpMsgBase * TrexStatelessDpQuit::clone(){ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpQuit(); @@ -140,7 +141,7 @@ TrexStatelessDpQuit::clone(){ bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){ - + /* quit */ dp_core->quit_main_loop(); return (true); @@ -155,7 +156,7 @@ bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){ return (true); } -TrexStatelessCpToDpMsgBase * +TrexStatelessCpToDpMsgBase * TrexStatelessDpCanQuit::clone(){ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpCanQuit(); @@ -165,7 +166,7 @@ TrexStatelessDpCanQuit::clone(){ /************************* update traffic message - ************************/ + ************************/ bool TrexStatelessDpUpdate::handle(TrexStatelessDpCore *dp_core) { dp_core->update_traffic(m_port_id, m_factor); @@ -207,3 +208,18 @@ TrexDpPortEventMsg::handle() { return (true); } +/************************* messages from CP to RX **********************/ +bool TrexStatelessRxStartMsg::handle (CRxCoreStateless *rx_core) { + rx_core->work(); + return true; +} + +bool TrexStatelessRxStopMsg::handle (CRxCoreStateless *rx_core) { + rx_core->idle(); + return true; +} + +bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) { + rx_core->quit(); + return true; +} diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index dda086b7..0eed01bd 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -5,7 +5,7 @@ */ /* -Copyright (c) 2015-2015 Cisco Systems, Inc. +Copyright (c) 2015-2016 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -22,16 +22,17 @@ limitations under the License. #ifndef __TREX_STATELESS_MESSAGING_H__ #define __TREX_STATELESS_MESSAGING_H__ -#include <msg_manager.h> -#include <trex_dp_port_events.h> +#include "msg_manager.h" +#include "trex_dp_port_events.h" class TrexStatelessDpCore; +class CRxCoreStateless; class TrexStreamsCompiledObj; class CFlowGenListPerThread; /** * defines the base class for CP to DP messages - * + * * @author imarom (27-Oct-15) */ class TrexStatelessCpToDpMsgBase { @@ -49,7 +50,7 @@ public: /** * clone the current message - * + * */ virtual TrexStatelessCpToDpMsgBase * clone() = 0; @@ -76,7 +77,7 @@ protected: /** * a message to start traffic - * + * * @author imarom (27-Oct-15) */ class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase { @@ -137,7 +138,7 @@ private: /** * a message to stop traffic - * + * * @author imarom (27-Oct-15) */ class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase { @@ -191,9 +192,9 @@ private: }; /** - * a message to Quit the datapath traffic. support only stateless for now - * - * @author hhaim + * a message to Quit the datapath traffic. support only stateless for now + * + * @author hhaim */ class TrexStatelessDpQuit : public TrexStatelessCpToDpMsgBase { public: @@ -209,9 +210,9 @@ public: }; /** - * a message to check if both port are idel and exit - * - * @author hhaim + * a message to check if both port are idel and exit + * + * @author hhaim */ class TrexStatelessDpCanQuit : public TrexStatelessCpToDpMsgBase { public: @@ -247,7 +248,7 @@ private: /** * barrier message for DP core - * + * */ class TrexStatelessDpBarrier : public TrexStatelessCpToDpMsgBase { public: @@ -270,7 +271,7 @@ private: /** * defines the base class for CP to DP messages - * + * * @author imarom (27-Oct-15) */ class TrexStatelessDpToCpMsgBase { @@ -284,7 +285,7 @@ public: /** * virtual function to handle a message - * + * */ virtual bool handle() = 0; @@ -295,9 +296,9 @@ public: /** - * a message indicating an event has happened on a port at the - * DP - * + * a message indicating an event has happened on a port at the + * DP + * */ class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase { public: @@ -326,8 +327,45 @@ private: int m_thread_id; uint8_t m_port_id; int m_event_id; - + }; -#endif /* __TREX_STATELESS_MESSAGING_H__ */ +/************************* messages from CP to RX **********************/ +/** + * defines the base class for CP to RX messages + * + */ +class TrexStatelessCpToRxMsgBase { +public: + + TrexStatelessCpToRxMsgBase() { + } + + virtual ~TrexStatelessCpToRxMsgBase() { + } + + /** + * virtual function to handle a message + * + */ + virtual bool handle (CRxCoreStateless *rx_core) = 0; + + /* no copy constructor */ + TrexStatelessCpToRxMsgBase(TrexStatelessCpToRxMsgBase &) = delete; + +}; + +class TrexStatelessRxStartMsg : public TrexStatelessCpToRxMsgBase { + bool handle (CRxCoreStateless *rx_core); +}; + +class TrexStatelessRxStopMsg : public TrexStatelessCpToRxMsgBase { + bool handle (CRxCoreStateless *rx_core); +}; + +class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { + bool handle (CRxCoreStateless *rx_core); +}; + +#endif /* __TREX_STATELESS_MESSAGING_H__ */ diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp new file mode 100644 index 00000000..929ad7fa --- /dev/null +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -0,0 +1,217 @@ +#include <stdio.h> +#include "bp_sim.h" +#include "flow_stat_parser.h" +#include "latency.h" +#include "trex_stateless_messaging.h" +#include "trex_stateless_rx_core.h" + +void CRxCoreStateless::create(const CRxSlCfg &cfg) { + m_max_ports = cfg.m_max_ports; + + CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx(); + + m_ring_from_cp = cp_rx->getRingCpToDp(0); + m_ring_to_cp = cp_rx->getRingDpToCp(0); + m_state = STATE_IDLE; + + for (int i = 0; i < m_max_ports; i++) { + CLatencyManagerPerPort * lp = &m_ports[i]; + lp->m_io = cfg.m_ports[i]; + } + m_cpu_cp_u.Create(&m_cpu_dp_u); +} + +void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) { + msg->handle(this); + delete msg; +} + +bool CRxCoreStateless::periodic_check_for_cp_messages() { + /* fast path */ + if ( likely ( m_ring_from_cp->isEmpty() ) ) { + return false; + } + + while ( true ) { + CGenNode * node = NULL; + + if (m_ring_from_cp->Dequeue(node) != 0) { + break; + } + assert(node); + TrexStatelessCpToRxMsgBase * msg = (TrexStatelessCpToRxMsgBase *)node; + handle_cp_msg(msg); + } + + return true; + +} + +void CRxCoreStateless::idle_state_loop() { + const int SHORT_DELAY_MS = 2; + const int LONG_DELAY_MS = 50; + const int DEEP_SLEEP_LIMIT = 2000; + + int counter = 0; + + while (m_state == STATE_IDLE) { + bool had_msg = periodic_check_for_cp_messages(); + if (had_msg) { + counter = 0; + continue; + } + + /* enter deep sleep only if enough time had passed */ + if (counter < DEEP_SLEEP_LIMIT) { + delay(SHORT_DELAY_MS); + counter++; + } else { + delay(LONG_DELAY_MS); + } + } +} + +void CRxCoreStateless::start() { + static int count = 0; + static int i = 0; + bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false; + + while (true) { + if (m_state == STATE_WORKING) { + i++; + if (i == 100) { + i = 0; + // if no packets in 100 cycles, sleep for a while to spare the cpu + if (count == 0) { + delay(1); + } + count = 0; + periodic_check_for_cp_messages(); // m_state might change in here + } + } else { + if (m_state == STATE_QUIT) + break; + idle_state_loop(); + } + if (do_try_rx_queue) { + try_rx_queues(); + } + count += try_rx(); + } +} + +void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPort *lp, rte_mbuf_t *m) { + CFlowStatParser parser; + + if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) { + uint16_t ip_id; + if (parser.get_ip_id(ip_id) == 0) { + if (is_flow_stat_id(ip_id)) { + uint16_t hw_id = get_hw_id(ip_id); + lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1); + lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len); + } + } + } +} + +// In VM setup, handle packets coming as messages from DP cores. +void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) { + while ( true ) { + CGenNode * node; + if ( r->Dequeue(node) != 0 ) { + break; + } + assert(node); + + CGenNodeMsgBase * msg = (CGenNodeMsgBase *)node; + CGenNodeLatencyPktInfo * l_msg; + uint8_t msg_type = msg->m_msg_type; + uint8_t rx_port_index; + CLatencyManagerPerPort * lp; + + switch (msg_type) { + case CGenNodeMsgBase::LATENCY_PKT: + l_msg = (CGenNodeLatencyPktInfo *)msg; + assert(l_msg->m_latency_offset == 0xdead); + rx_port_index = (thread_id << 1) + (l_msg->m_dir & 1); + assert( rx_port_index < m_max_ports ); + lp = &m_ports[rx_port_index]; + handle_rx_pkt(lp, (rte_mbuf_t *)l_msg->m_pkt); + break; + default: + printf("ERROR latency-thread message type is not valid %d \n", msg_type); + assert(0); + } + + CGlobalInfo::free_node(node); + } +} + +// VM mode function. Handle messages from DP +void CRxCoreStateless::try_rx_queues() { + + CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp(); + uint8_t threads=CMsgIns::Ins()->get_num_threads(); + int ti; + for (ti = 0; ti < (int)threads; ti++) { + CNodeRing * r = rx_dp->getRingDpToCp(ti); + if ( ! r->isEmpty() ) { + handle_rx_queue_msgs((uint8_t)ti, r); + } + } +} + +int CRxCoreStateless::try_rx() { + rte_mbuf_t * rx_pkts[64]; + int i, total_pkts = 0; + for (i = 0; i < m_max_ports; i++) { + CLatencyManagerPerPort * lp = &m_ports[i]; + rte_mbuf_t * m; + m_cpu_dp_u.start_work(); + /* try to read 64 packets clean up the queue */ + uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64); + total_pkts += cnt_p; + if (cnt_p) { + int j; + for (j = 0; j < cnt_p; j++) { + m = rx_pkts[j]; + handle_rx_pkt(lp, m); + rte_pktmbuf_free(m); + } + /* commit only if there was work to do ! */ + m_cpu_dp_u.commit(); + }/* if work */ + }// all ports + return total_pkts; +} + +bool CRxCoreStateless::is_flow_stat_id(uint16_t id) { + if ((id & 0xff00) == IP_ID_RESERVE_BASE) return true; + return false; +} + +uint16_t CRxCoreStateless::get_hw_id(uint16_t id) { + return (0x00ff & id); +} + +void CRxCoreStateless::reset_rx_stats(uint8_t port_id) { + for (int hw_id = 0; hw_id < MAX_FLOW_STATS; hw_id++) { + m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); + } +} + +int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset) { + for (int hw_id = min; hw_id <= max; hw_id++) { + rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id]; + if (reset) { + m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); + } + } + return 0; +} + +double CRxCoreStateless::get_cpu_util() { + m_cpu_cp_u.Update(); + return m_cpu_cp_u.GetVal(); +} diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h new file mode 100644 index 00000000..5ab12f4e --- /dev/null +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -0,0 +1,80 @@ +/* + Ido Barnea + Cisco Systems, Inc. +*/ + +/* + Copyright (c) 2016-2016 Cisco Systems, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +#ifndef __TREX_STATELESS_RX_CORE_H__ +#define __TREX_STATELESS_RX_CORE_H__ +#include <stdint.h> +#include "latency.h" +#include "utl_cpuu.h" + +class TrexStatelessCpToRxMsgBase; + +class CRxSlCfg { + public: + CRxSlCfg (){ + m_max_ports = 0; + m_cps = 0.0; + } + + public: + uint32_t m_max_ports; + double m_cps; + CPortLatencyHWBase * m_ports[TREX_MAX_PORTS]; +}; + +class CRxCoreStateless { + enum state_e { + STATE_IDLE, + STATE_WORKING, + STATE_QUIT + }; + + public: + void start(); + void create(const CRxSlCfg &cfg); + void reset_rx_stats(uint8_t port_id); + int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset); + void work() {m_state = STATE_WORKING;} + void idle() {m_state = STATE_IDLE;} + void quit() {m_state = STATE_QUIT;} + double get_cpu_util(); + + private: + void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); + bool periodic_check_for_cp_messages(); + void idle_state_loop(); + void handle_rx_pkt(CLatencyManagerPerPort * lp, rte_mbuf_t * m); + void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); + int try_rx(); + void try_rx_queues(); + bool is_flow_stat_id(uint16_t id); + uint16_t get_hw_id(uint16_t id); + + private: + uint32_t m_max_ports; + bool m_has_streams; + CLatencyManagerPerPort m_ports[TREX_MAX_PORTS]; + state_e m_state; /* state of all ports */ + CNodeRing *m_ring_from_cp; + CNodeRing *m_ring_to_cp; + CCpuUtlDp m_cpu_dp_u; + CCpuUtlCp m_cpu_cp_u; +}; +#endif |