From a53f6be0617721b535086298095ad49057a7be69 Mon Sep 17 00:00:00 2001 From: Ido Barnea Date: Sun, 24 Apr 2016 14:43:28 +0300 Subject: Working version. temporary send_node that duplicates the mbuf data --- src/flow_stat.cpp | 191 +++++++++++++++++++--------- src/flow_stat.h | 174 ++++++++++++++++++++++--- src/internal_api/trex_platform_api.h | 12 +- src/latency.cpp | 4 - src/latency.h | 6 +- src/main_dpdk.cpp | 142 ++++++++++++++++----- src/main_dpdk.h | 1 + src/stateless/cp/trex_stream.h | 2 +- src/stateless/dp/trex_stateless_dp_core.cpp | 12 ++ src/stateless/dp/trex_stream_node.h | 2 +- src/stateless/rx/trex_stateless_rx_core.cpp | 110 ++++++++++++++-- src/stateless/rx/trex_stateless_rx_core.h | 31 ++++- src/time_histogram.cpp | 5 +- 13 files changed, 553 insertions(+), 139 deletions(-) diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp index 5640b054..52d8129c 100644 --- a/src/flow_stat.cpp +++ b/src/flow_stat.cpp @@ -63,6 +63,7 @@ stream_del: HW_ID_INIT static const uint16_t HW_ID_INIT = UINT16_MAX; static const uint16_t HW_ID_FREE = UINT16_MAX - 1; static const uint8_t PAYLOAD_RULE_PROTO = 255; +const uint16_t FLOW_STAT_PAYLOAD_IP_ID = IP_ID_RESERVE_BASE + MAX_FLOW_STATS; inline std::string methodName(const std::string& prettyFunction) { @@ -87,10 +88,10 @@ inline std::string methodName(const std::string& prettyFunction) /************** class CFlowStatUserIdInfo ***************/ CFlowStatUserIdInfo::CFlowStatUserIdInfo(uint8_t proto) { - memset(m_rx_counter, 0, sizeof(m_rx_counter)); - memset(m_rx_counter_base, 0, sizeof(m_rx_counter)); - memset(m_tx_counter, 0, sizeof(m_tx_counter)); - memset(m_tx_counter_base, 0, sizeof(m_tx_counter)); + memset(m_rx_cntr, 0, sizeof(m_rx_cntr)); + memset(m_rx_cntr_base, 0, sizeof(m_rx_cntr)); + memset(m_tx_cntr, 0, sizeof(m_tx_cntr)); + memset(m_tx_cntr_base, 0, sizeof(m_tx_cntr)); m_hw_id = UINT16_MAX; m_proto = proto; m_ref_count = 1; @@ -100,34 +101,35 @@ CFlowStatUserIdInfo::CFlowStatUserIdInfo(uint8_t proto) { m_rx_changed[i] = false; m_tx_changed[i] = false; } + m_rfc2544_support = false; } std::ostream& operator<<(std::ostream& os, const CFlowStatUserIdInfo& cf) { os << "hw_id:" << cf.m_hw_id << " proto:" << (uint16_t) cf.m_proto << " ref(" << (uint16_t) cf.m_ref_count << "," << (uint16_t) cf.m_trans_ref_count << ")"; os << " rx count ("; - os << cf.m_rx_counter[0]; + os << cf.m_rx_cntr[0]; for (int i = 1; i < TREX_MAX_PORTS; i++) { - os << "," << cf.m_rx_counter[i]; + os << "," << cf.m_rx_cntr[i]; } os << ")"; os << " rx count base("; - os << cf.m_rx_counter_base[0]; + os << cf.m_rx_cntr_base[0]; for (int i = 1; i < TREX_MAX_PORTS; i++) { - os << "," << cf.m_rx_counter_base[i]; + os << "," << cf.m_rx_cntr_base[i]; } os << ")"; os << " tx count ("; - os << cf.m_tx_counter[0]; + os << cf.m_tx_cntr[0]; for (int i = 1; i < TREX_MAX_PORTS; i++) { - os << "," << cf.m_tx_counter[i]; + os << "," << cf.m_tx_cntr[i]; } os << ")"; os << " tx count base("; - os << cf.m_tx_counter_base[0]; + os << cf.m_tx_cntr_base[0]; for (int i = 1; i < TREX_MAX_PORTS; i++) { - os << "," << cf.m_tx_counter_base[i]; + os << "," << cf.m_tx_cntr_base[i]; } os << ")"; @@ -153,10 +155,10 @@ void CFlowStatUserIdInfo::reset_hw_id() { // we are not attached to hw. Save packet count of session. // Next session will start counting from 0. for (int i = 0; i < TREX_MAX_PORTS; i++) { - m_rx_counter_base[i] += m_rx_counter[i]; - memset(&m_rx_counter[i], 0, sizeof(m_rx_counter[0])); - m_tx_counter_base[i] += m_tx_counter[i]; - memset(&m_tx_counter[i], 0, sizeof(m_tx_counter[0])); + m_rx_cntr_base[i] += m_rx_cntr[i]; + memset(&m_rx_cntr[i], 0, sizeof(m_rx_cntr[0])); + m_tx_cntr_base[i] += m_tx_cntr[i]; + memset(&m_tx_cntr[i], 0, sizeof(m_tx_cntr[0])); } } @@ -215,9 +217,9 @@ CFlowStatUserIdMap::add_user_id(uint32_t user_id, uint8_t proto) { CFlowStatUserIdInfo *new_id = new CFlowStatUserIdInfo(proto); if (proto == PAYLOAD_RULE_PROTO) { - new_id = new CFlowStatUserIdInfo(proto); - } else { new_id = new CFlowStatUserIdInfoPayload(proto); + } else { + new_id = new CFlowStatUserIdInfo(proto); } if (new_id != NULL) { std::pair ret; @@ -368,15 +370,21 @@ uint16_t CFlowStatUserIdMap::unmap(uint32_t user_id) { return UINT16_MAX; } uint16_t old_hw_id = c_user_id->get_hw_id(); - c_user_id->reset_hw_id(); + c_user_id->reset_hw_id();//??? need to call reset of CFlowStatUserIdInfoPayload if needed return old_hw_id; } /************** class CFlowStatHwIdMap ***************/ CFlowStatHwIdMap::CFlowStatHwIdMap() { - m_num_free = MAX_FLOW_STATS; - for (int i = 0; i < MAX_FLOW_STATS; i++) { + m_map = NULL; +} + +void CFlowStatHwIdMap::create(uint16_t size) { + m_map = new uint32_t[size]; + assert (m_map != NULL); + m_num_free = size; + for (int i = 0; i < size; i++) { m_map[i] = HW_ID_FREE; } } @@ -434,9 +442,11 @@ CFlowStatRuleMgr::CFlowStatRuleMgr() { m_max_hw_id_payload = -1; m_num_started_streams = 0; m_ring_to_rx = NULL; - m_capabilities = 0; + m_cap = 0; m_parser = NULL; m_rx_core = NULL; + m_hw_id_map.create(MAX_FLOW_STATS); + m_hw_id_map_payload.create(MAX_FLOW_STATS_PAYLOAD); } CFlowStatRuleMgr::~CFlowStatRuleMgr() { @@ -445,13 +455,13 @@ CFlowStatRuleMgr::~CFlowStatRuleMgr() { } void CFlowStatRuleMgr::create() { - uint16_t num_counters, capabilities; + uint16_t num_counters, cap; TrexStateless *tstateless = get_stateless_obj(); assert(tstateless); m_api = tstateless->get_platform_api(); assert(m_api); - m_api->get_interface_stat_info(0, num_counters, capabilities); + m_api->get_interface_stat_info(0, num_counters, cap); m_api->get_port_num(m_num_ports); for (uint8_t port = 0; port < m_num_ports; port++) { assert(m_api->reset_hw_flow_stats(port) == 0); @@ -461,7 +471,7 @@ void CFlowStatRuleMgr::create() { m_rx_core = get_rx_sl_core_obj(); m_parser = m_api->get_flow_stat_parser(); assert(m_parser); - m_capabilities = capabilities; + m_cap = cap; } std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) { @@ -527,9 +537,9 @@ int CFlowStatRuleMgr::add_stream(TrexStream * stream) { //??? put back assert(stream->m_rx_check.m_hw_id == HW_ID_INIT); - uint16_t rule_type = stream->m_rx_check.m_rule_type; + TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type; - if ((m_capabilities & rule_type) == 0) { + if ((m_cap & rule_type) == 0) { throw TrexFStatEx("Interface does not support given rule type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE_FOR_IF); } @@ -580,7 +590,7 @@ int CFlowStatRuleMgr::del_stream(TrexStream * stream) { if (! m_api) throw TrexFStatEx("Called del_stream, but no stream was added", TrexException::T_FLOW_STAT_NO_STREAMS_EXIST); - uint16_t rule_type = stream->m_rx_check.m_rule_type; + TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type; switch(rule_type) { case TrexPlatformApi::IF_STAT_IPV4_ID: break; @@ -661,9 +671,9 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream) { , TrexException::T_FLOW_STAT_ALREADY_STARTED); } - uint16_t rule_type = stream->m_rx_check.m_rule_type; + TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type; - if ((m_capabilities & rule_type) == 0) { + if ((m_cap & rule_type) == 0) { throw TrexFStatEx("Interface does not support given rule type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE_FOR_IF); } @@ -711,20 +721,28 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream) { m_hw_id_map_payload.map(hw_id, user_id); } // clear hardware counters. Just in case we have garbage from previous iteration - rx_per_flow_t rx_counter; - tx_per_flow_t tx_counter; + rx_per_flow_t rx_cntr; + tx_per_flow_t tx_cntr; + rfc2544_info_t rfc2544_info; for (uint8_t port = 0; port < m_num_ports; port++) { - m_api->get_flow_stats(port, &rx_counter, (void *)&tx_counter, hw_id, hw_id, true); + m_api->get_flow_stats(port, &rx_cntr, (void *)&tx_cntr, hw_id, hw_id, true, rule_type); + } + if (rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) { + m_api->get_rfc2544_info(&rfc2544_info, hw_id, hw_id, true); } } } - m_parser->set_ip_id(IP_ID_RESERVE_BASE + hw_id); - // saving given hw_id on stream for use by tx statistics count if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) { + m_parser->set_ip_id(IP_ID_RESERVE_BASE + hw_id); stream->m_rx_check.m_hw_id = hw_id; } else { + struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *) + (stream->m_pkt.binary + stream->m_pkt.len - sizeof(struct flow_stat_payload_header)); + fsp_head->hw_id = hw_id; + fsp_head->magic = FLOW_STAT_PAYLOAD_MAGIC; + m_parser->set_ip_id(FLOW_STAT_PAYLOAD_IP_ID); // for payload rules, we use the range right after ip id rules stream->m_rx_check.m_hw_id = hw_id + MAX_FLOW_STATS; } @@ -783,7 +801,7 @@ int CFlowStatRuleMgr::stop_stream(TrexStream * stream) { return 0; } - uint16_t rule_type = stream->m_rx_check.m_rule_type; + TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type; switch(rule_type) { case TrexPlatformApi::IF_STAT_IPV4_ID: break; @@ -811,28 +829,37 @@ int CFlowStatRuleMgr::stop_stream(TrexStream * stream) { p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(hw_id)); } else { p_user_id = m_user_id_map.find_user_id(m_hw_id_map_payload.get_user_id(hw_id)); - hw_id += MAX_FLOW_STATS; } assert(p_user_id != NULL); - rx_per_flow_t rx_counter; - tx_per_flow_t tx_counter; + rx_per_flow_t rx_cntr; + tx_per_flow_t tx_cntr; + rfc2544_info_t rfc2544_info; for (uint8_t port = 0; port < m_num_ports; port++) { if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) { m_api->del_rx_flow_stat_rule(port, FLOW_STAT_RULE_TYPE_IPV4_ID, proto, hw_id); } - m_api->get_flow_stats(port, &rx_counter, (void *)&tx_counter, hw_id, hw_id, true); + m_api->get_flow_stats(port, &rx_cntr, (void *)&tx_cntr, hw_id, hw_id, true, rule_type); // when stopping, always send counters for stopped stream one last time - p_user_id->set_rx_counter(port, rx_counter); + p_user_id->set_rx_cntr(port, rx_cntr); p_user_id->set_need_to_send_rx(port); - p_user_id->set_tx_counter(port, tx_counter); + p_user_id->set_tx_cntr(port, tx_cntr); p_user_id->set_need_to_send_tx(port); } - m_user_id_map.unmap(stream->m_rx_check.m_pg_id); + if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) { m_hw_id_map.unmap(hw_id); } else { + CFlowStatUserIdInfoPayload *p_user_id_p = (CFlowStatUserIdInfoPayload *)p_user_id; + std::string json; + m_api->get_rfc2544_info(&rfc2544_info, hw_id, hw_id, true); + p_user_id_p->set_jitter(rfc2544_info.get_jitter()); + rfc2544_info.get_latency_json(json); + p_user_id_p->set_latency_json(json); + p_user_id_p->set_seq_err_cnt(rfc2544_info.get_seq_err_cnt()); + p_user_id_p->set_ooo_cnt(rfc2544_info.get_ooo_cnt()); m_hw_id_map_payload.unmap(hw_id); } + m_user_id_map.unmap(stream->m_rx_check.m_pg_id); } } m_num_started_streams--; @@ -871,6 +898,7 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { rx_per_flow_t rx_stats_payload[MAX_FLOW_STATS]; tx_per_flow_t tx_stats[MAX_FLOW_STATS]; tx_per_flow_t tx_stats_payload[MAX_FLOW_STATS_PAYLOAD]; + rfc2544_info_t rfc2544_info[MAX_FLOW_STATS_PAYLOAD]; Json::FastWriter writer; Json::Value root; @@ -890,18 +918,18 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { return true; } + m_api->get_rfc2544_info(rfc2544_info, 0, m_max_hw_id_payload, false); + // read hw counters, and update for (uint8_t port = 0; port < m_num_ports; port++) { - m_api->get_flow_stats(port, rx_stats, (void *)tx_stats, 0, m_max_hw_id, false); - m_api->get_flow_stats(port, rx_stats_payload, (void *)tx_stats_payload, MAX_FLOW_STATS, MAX_FLOW_STATS + m_max_hw_id_payload, false); + m_api->get_flow_stats(port, rx_stats, (void *)tx_stats, 0, m_max_hw_id, false, TrexPlatformApi::IF_STAT_IPV4_ID); for (int i = 0; i <= m_max_hw_id; i++) { - //??? add rx for payload rules if (rx_stats[i].get_pkts() != 0) { rx_per_flow_t rx_pkts = rx_stats[i]; CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(i)); if (likely(p_user_id != NULL)) { - if (p_user_id->get_rx_counter(port) != rx_pkts) { - p_user_id->set_rx_counter(port, rx_pkts); + if (p_user_id->get_rx_cntr(port) != rx_pkts) { + p_user_id->set_rx_cntr(port, rx_pkts); p_user_id->set_need_to_send_rx(port); } } else { @@ -913,8 +941,8 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { tx_per_flow_t tx_pkts = tx_stats[i]; CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(i)); if (likely(p_user_id != NULL)) { - if (p_user_id->get_tx_counter(port) != tx_pkts) { - p_user_id->set_tx_counter(port, tx_pkts); + if (p_user_id->get_tx_cntr(port) != tx_pkts) { + p_user_id->set_tx_cntr(port, tx_pkts); p_user_id->set_need_to_send_tx(port); } } else { @@ -924,13 +952,28 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { } } // payload rules + m_api->get_flow_stats(port, rx_stats_payload, (void *)tx_stats_payload, 0, m_max_hw_id_payload + , false, TrexPlatformApi::IF_STAT_PAYLOAD); for (int i = 0; i <= m_max_hw_id_payload; i++) { + if (rx_stats_payload[i].get_pkts() != 0) { + rx_per_flow_t rx_pkts = rx_stats_payload[i]; + CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map_payload.get_user_id(i)); + if (likely(p_user_id != NULL)) { + if (p_user_id->get_rx_cntr(port) != rx_pkts) { + p_user_id->set_rx_cntr(port, rx_pkts); + p_user_id->set_need_to_send_rx(port); + } + } else { + std::cerr << __METHOD_NAME__ << i << ":Could not count " << rx_pkts << " rx payload packets, on port " + << (uint16_t)port << ", because no mapping was found." << std::endl; + } + } if (tx_stats_payload[i].get_pkts() != 0) { tx_per_flow_t tx_pkts = tx_stats_payload[i]; CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map_payload.get_user_id(i)); if (likely(p_user_id != NULL)) { - if (p_user_id->get_tx_counter(port) != tx_pkts) { - p_user_id->set_tx_counter(port, tx_pkts); + if (p_user_id->get_tx_cntr(port) != tx_pkts) { + p_user_id->set_tx_cntr(port, tx_pkts); p_user_id->set_need_to_send_tx(port); } } else { @@ -947,8 +990,7 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { bool send_empty = true; CFlowStatUserIdInfo *user_id_info = it->second; uint32_t user_id = it->first; - std::string str_user_id = static_cast( &(std::ostringstream() - << user_id) )->str(); + std::string str_user_id = static_cast( &(std::ostringstream() << user_id) )->str(); if (! user_id_info->was_sent()) { data_section[str_user_id]["first_time"] = true; user_id_info->set_was_sent(true); @@ -958,18 +1000,49 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { std::string str_port = static_cast( &(std::ostringstream() << int(port) ) )->str(); if (user_id_info->need_to_send_rx(port) || baseline) { user_id_info->set_no_need_to_send_rx(port); - data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_counter(port).get_pkts()); - if (m_capabilities & TrexPlatformApi::IF_STAT_RX_BYTES_COUNT) - data_section[str_user_id]["rx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_rx_counter(port).get_bytes()); + data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_cntr(port).get_pkts()); + if (m_cap & TrexPlatformApi::IF_STAT_RX_BYTES_COUNT) + //???put back data_section[str_user_id]["rx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_rx_cntr(port).get_bytes()); send_empty = false; } if (user_id_info->need_to_send_tx(port) || baseline) { user_id_info->set_no_need_to_send_tx(port); - data_section[str_user_id]["tx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_tx_counter(port).get_pkts()); - data_section[str_user_id]["tx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_tx_counter(port).get_bytes()); + data_section[str_user_id]["tx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_tx_cntr(port).get_pkts()); + //??? pub back data_section[str_user_id]["tx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_tx_cntr(port).get_bytes()); send_empty = false; } } + if (user_id_info->rfc2544_support()) { + CFlowStatUserIdInfoPayload *user_id_info_p = (CFlowStatUserIdInfoPayload *)user_id_info; + // payload object. Send also latency, jitter... + std::string json; + if (user_id_info->is_hw_id()) { + // if mapped to hw_id, take info from what we just got from rx core + uint16_t hw_id = user_id_info->get_hw_id(); + rfc2544_info[hw_id].get_latency_json(json); + user_id_info_p->set_seq_err_cnt(rfc2544_info[hw_id].get_seq_err_cnt()); + user_id_info_p->set_ooo_cnt(rfc2544_info[hw_id].get_ooo_cnt()); + data_section[str_user_id]["rfc2544"][""] = json; + data_section[str_user_id]["rfc2544"]["jitter"] = rfc2544_info[hw_id].get_jitter(); + } else { + // Not mapped to hw_id. Get saved info. + user_id_info_p->get_latency_json(json); + data_section[str_user_id]["rfc2544"]["latency"] = json; + data_section[str_user_id]["rfc2544"]["jitter"] = user_id_info_p->get_jitter(); + } + data_section[str_user_id]["rfc2544"]["err_cntrs"]["lost"] + = Json::Value::UInt64(user_id_info_p->get_seq_err_cnt()); + data_section[str_user_id]["rfc2544"]["err_cntrs"]["out_of_order"] + = Json::Value::UInt64(user_id_info_p->get_ooo_cnt()); + + //??? temp - remove + data_section[str_user_id]["tx_bytes"]["0"] + = Json::Value::UInt64(user_id_info_p->get_seq_err_cnt()); + data_section[str_user_id]["tx_bytes"]["1"] = 0; + data_section[str_user_id]["rx_bytes"]["0"] + = Json::Value::UInt64(user_id_info_p->get_ooo_cnt()); + data_section[str_user_id]["rx_bytes"]["1"] = 0; + } if (send_empty) { data_section[str_user_id] = Json::objectValue; } diff --git a/src/flow_stat.h b/src/flow_stat.h index 91ee76d5..36a4bad1 100644 --- a/src/flow_stat.h +++ b/src/flow_stat.h @@ -1,4 +1,4 @@ -/* +/*/ Ido Barnea Cisco Systems, Inc. */ @@ -34,6 +34,8 @@ // Do not change this value. In i350 cards, we filter according to first byte of IP ID // In other places, we identify packets by if (ip_id > IP_ID_RESERVE_BASE) #define IP_ID_RESERVE_BASE 0xff00 +#define FLOW_STAT_PAYLOAD_MAGIC 0xABCD +extern const uint16_t FLOW_STAT_PAYLOAD_IP_ID; typedef std::map flow_stat_map_t; typedef std::map::iterator flow_stat_map_it_t; @@ -54,6 +56,92 @@ class TrexFStatEx : public TrexException { } }; + +class rfc2544_info_t_ { + friend class CFlowStatUserIdInfoPayload; + + public: + rfc2544_info_t_() { + clear(); + } + + inline void get_latency_json(std::string & json) const { + json = m_latency; + } + + inline void set_latency_json(std::string json) { + m_latency = json; + } + + inline void set_err_cntrs(uint64_t seq, uint64_t ooo) { + m_seq_error = seq; + m_out_of_order = ooo; + } + + inline uint64_t get_seq_err_cnt() { + return m_seq_error; + } + + inline uint64_t get_ooo_cnt() { + return m_out_of_order; + } + + inline double get_jitter() const { + return m_jitter; + } + + inline void set_jitter(double jitter) { + m_jitter = jitter; + } + + inline void clear() { + m_seq_error = 0; + m_out_of_order = 0; + m_jitter = 0; + m_latency = ""; + } + + inline rfc2544_info_t_ operator+ (const rfc2544_info_t_ &t_in) { + rfc2544_info_t_ t_out; + t_out.m_seq_error = this->m_seq_error + t_in.m_seq_error; + t_out.m_out_of_order = this->m_out_of_order + t_in.m_out_of_order; + return t_out; + } + + inline rfc2544_info_t_ operator- (const rfc2544_info_t_ &t_in) { + rfc2544_info_t_ t_out; + t_out.m_seq_error = this->m_seq_error - t_in.m_seq_error; + t_out.m_out_of_order = this->m_out_of_order - t_in.m_out_of_order; + return t_out; + } + + inline rfc2544_info_t_ operator+= (const rfc2544_info_t_ &t_in) { + m_seq_error += t_in.m_seq_error; + m_out_of_order += t_in.m_out_of_order; + return *this; + } + + inline bool operator!= (const rfc2544_info_t_ &t_in) { + if ((m_jitter != t_in.m_jitter) || (m_seq_error != t_in.m_seq_error) || (m_out_of_order != t_in.m_out_of_order)) + return true; + return false; + } + + friend std::ostream& operator<<(std::ostream& os, const rfc2544_info_t_ &t) { + os << "jitter:" << t.m_jitter << " errors(seq:" + << t.m_seq_error << " out of order:" << t.m_out_of_order << ")"; + return os; + } + + private: + uint64_t m_seq_error; + uint64_t m_out_of_order; + double m_jitter; + // json string of latency. In case of stop/start, we calculate latency graph from scratch, + // so when stopping, we just "freeze" state for reporting by saving the json string + std::string m_latency; +}; + class tx_per_flow_t_ { public: tx_per_flow_t_() { @@ -115,8 +203,12 @@ class tx_per_flow_t_ { private: uint64_t m_bytes; uint64_t m_pkts; + uint64_t m_seq_error_base; + uint64_t m_out_of_order_base; + }; +typedef class rfc2544_info_t_ rfc2544_info_t; typedef class tx_per_flow_t_ tx_per_flow_t; typedef class tx_per_flow_t_ rx_per_flow_t; @@ -128,13 +220,13 @@ class CFlowStatUserIdInfo { CFlowStatUserIdInfo(uint8_t proto); virtual ~CFlowStatUserIdInfo() {}; friend std::ostream& operator<<(std::ostream& os, const CFlowStatUserIdInfo& cf); - void set_rx_counter(uint8_t port, rx_per_flow_t val) {m_rx_counter[port] = val;} - rx_per_flow_t get_rx_counter(uint8_t port) {return m_rx_counter[port] + m_rx_counter_base[port];} - void set_tx_counter(uint8_t port, tx_per_flow_t val) {m_tx_counter[port] = val;} - tx_per_flow_t get_tx_counter(uint8_t port) {return m_tx_counter[port] + m_tx_counter_base[port];} + void set_rx_cntr(uint8_t port, rx_per_flow_t val) {m_rx_cntr[port] = val;} + rx_per_flow_t get_rx_cntr(uint8_t port) {return m_rx_cntr[port] + m_rx_cntr_base[port];} + void set_tx_cntr(uint8_t port, tx_per_flow_t val) {m_tx_cntr[port] = val;} + tx_per_flow_t get_tx_cntr(uint8_t port) {return m_tx_cntr[port] + m_tx_cntr_base[port];} void set_hw_id(uint16_t hw_id) {m_hw_id = hw_id;} uint16_t get_hw_id() {return m_hw_id;} - void reset_hw_id(); + virtual void reset_hw_id(); bool is_hw_id() {return (m_hw_id != UINT16_MAX);} uint64_t get_proto() {return m_proto;} uint8_t get_ref_count() {return m_ref_count;} @@ -151,16 +243,20 @@ class CFlowStatUserIdInfo { void set_need_to_send_tx(uint8_t port) {m_tx_changed[port] = true;} bool was_sent() {return m_was_sent == true;} void set_was_sent(bool val) {m_was_sent = val;} + bool rfc2544_support() {return m_rfc2544_support;} + + protected: + bool m_rfc2544_support; private: bool m_rx_changed[TREX_MAX_PORTS]; // Which RX counters changed since we last published bool m_tx_changed[TREX_MAX_PORTS]; // Which TX counters changed since we last published - rx_per_flow_t m_rx_counter[TREX_MAX_PORTS]; // How many packets received with this user id since stream start + rx_per_flow_t m_rx_cntr[TREX_MAX_PORTS]; // How many packets received with this user id since stream start // How many packets received with this user id, since stream creation, before stream start. - rx_per_flow_t m_rx_counter_base[TREX_MAX_PORTS]; - tx_per_flow_t m_tx_counter[TREX_MAX_PORTS]; // How many packets transmitted with this user id since stream start + rx_per_flow_t m_rx_cntr_base[TREX_MAX_PORTS]; + tx_per_flow_t m_tx_cntr[TREX_MAX_PORTS]; // How many packets transmitted with this user id since stream start // How many packets transmitted with this user id, since stream creation, before stream start. - tx_per_flow_t m_tx_counter_base[TREX_MAX_PORTS]; + tx_per_flow_t m_tx_cntr_base[TREX_MAX_PORTS]; uint16_t m_hw_id; // Associated hw id. UINT16_MAX if no associated hw id. uint8_t m_proto; // protocol (UDP, TCP, other), associated with this user id. uint8_t m_ref_count; // How many streams with this user id exists @@ -173,8 +269,57 @@ typedef std::map::iterator flow_stat_user class CFlowStatUserIdInfoPayload : public CFlowStatUserIdInfo { public: - CFlowStatUserIdInfoPayload(uint8_t proto) : CFlowStatUserIdInfo(proto){}; + CFlowStatUserIdInfoPayload(uint8_t proto) : CFlowStatUserIdInfo(proto){m_rfc2544_support = true; clear();}; virtual void add_stream(uint8_t proto); + + void clear() { + m_rfc2544_info.clear(); + m_seq_error_base = 0; + m_out_of_order_base = 0; + } + inline void get_latency_json(std::string & json) const { + json = m_rfc2544_info.m_latency; + } + + inline void set_latency_json(std::string json) { + m_rfc2544_info.m_latency = json; + } + + inline double get_jitter() const { + return m_rfc2544_info.m_jitter; + } + + inline void set_jitter(double jitter) { + m_rfc2544_info.m_jitter = jitter; + } + + inline void set_seq_err_cnt(uint64_t cnt) { + m_rfc2544_info.m_seq_error = cnt; + } + + inline uint64_t get_seq_err_cnt() const { + return m_rfc2544_info.m_seq_error + m_seq_error_base; + } + + inline void set_ooo_cnt(uint64_t cnt) { + m_rfc2544_info.m_out_of_order = cnt; + } + + inline uint64_t get_ooo_cnt() const { + return m_rfc2544_info.m_out_of_order + m_out_of_order_base; + } + + inline void reset_hw_id() { + m_seq_error_base += m_rfc2544_info.m_seq_error; + m_out_of_order_base += m_rfc2544_info.m_out_of_order; + m_rfc2544_info.m_seq_error = 0; + m_rfc2544_info.m_out_of_order = 0; + } + + private: + rfc2544_info_t m_rfc2544_info; + uint64_t m_seq_error_base; + uint64_t m_out_of_order_base; }; class CFlowStatUserIdMap { @@ -202,13 +347,14 @@ class CFlowStatUserIdMap { class CFlowStatHwIdMap { public: CFlowStatHwIdMap(); + void create(uint16_t size); friend std::ostream& operator<<(std::ostream& os, const CFlowStatHwIdMap& cf); uint16_t find_free_hw_id(); void map(uint16_t hw_id, uint32_t user_id); void unmap(uint16_t hw_id); uint32_t get_user_id(uint16_t hw_id) {return m_map[hw_id];}; private: - uint32_t m_map[MAX_FLOW_STATS]; // translation from hw id to user id + uint32_t *m_map; // translation from hw id to user id uint16_t m_num_free; // How many free entries in the m_rules array }; @@ -241,8 +387,6 @@ class CFlowStatRuleMgr { private: CFlowStatHwIdMap m_hw_id_map; // map hw ids to user ids - // ??? need to make CFlowStatHwIdMap class adjustable per size. For now it is working since we allow same number - // of IP ID and pyaload rules CFlowStatHwIdMap m_hw_id_map_payload; // map hw id numbers of payload rules to user ids CFlowStatUserIdMap m_user_id_map; // map user ids to hw ids uint8_t m_num_ports; // How many ports are being used @@ -253,7 +397,7 @@ class CFlowStatRuleMgr { uint32_t m_num_started_streams; // How many started (transmitting) streams we have CNodeRing *m_ring_to_rx; // handle for sending messages to Rx core CFlowStatParser *m_parser; - uint16_t m_capabilities; + uint16_t m_cap; }; #endif diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h index fd6cdaf8..f60d1591 100644 --- a/src/internal_api/trex_platform_api.h +++ b/src/internal_api/trex_platform_api.h @@ -103,7 +103,7 @@ public: class TrexPlatformApi { public: - enum driver_stat_capabilities_e { + enum driver_stat_cap_e { IF_STAT_IPV4_ID = 1, IF_STAT_PAYLOAD = 2, IF_STAT_IPV6_FLOW_LABEL = 4, @@ -145,7 +145,9 @@ public: virtual void publish_async_data_now(uint32_t key, bool baseline) const = 0; virtual uint8_t get_dp_core_count() const = 0; virtual void get_interface_stat_info(uint8_t interface_id, uint16_t &num_counters, uint16_t &capabilities) const =0; - virtual int get_flow_stats(uint8_t port_id, void *stats, void *tx_stats, int min, int max, bool reset) const = 0; + virtual int get_flow_stats(uint8_t port_id, void *stats, void *tx_stats, int min, int max, bool reset + , TrexPlatformApi::driver_stat_cap_e type) const = 0; + virtual int get_rfc2544_info(void *rfc2544_info, int min, int max, bool reset) const = 0; virtual int reset_hw_flow_stats(uint8_t port_id) const = 0; virtual void get_port_num(uint8_t &port_num) const = 0; virtual int add_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const = 0; @@ -175,7 +177,9 @@ public: void publish_async_data_now(uint32_t key, bool baseline) const; uint8_t get_dp_core_count() const; void get_interface_stat_info(uint8_t interface_id, uint16_t &num_counters, uint16_t &capabilities) const; - int get_flow_stats(uint8_t port_id, void *stats, void *tx_stats, int min, int max, bool reset) const; + int get_flow_stats(uint8_t port_id, void *stats, void *tx_stats, int min, int max, bool reset + , TrexPlatformApi::driver_stat_cap_e type) const; + int get_rfc2544_info(void *rfc2544_info, int min, int max, bool reset) const; int reset_hw_flow_stats(uint8_t port_id) const; void get_port_num(uint8_t &port_num) const; int add_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const; @@ -230,7 +234,7 @@ public: virtual void publish_async_data_now(uint32_t key, bool baseline) const { } - virtual int get_flow_stats(uint8_t port_id, void *stats, void *tx_stats, int min, int max, bool reset) const {return 0;}; + virtual int get_rfc2544_info(void *rfc2544_info, int min, int max, bool reset) const {return 0;}; virtual int reset_hw_flow_stats(uint8_t port_id) const {return 0;}; virtual void get_port_num(uint8_t &port_num) const {port_num = 2;}; virtual int add_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const {return 0;} diff --git a/src/latency.cpp b/src/latency.cpp index 0303e89a..a7652bed 100644 --- a/src/latency.cpp +++ b/src/latency.cpp @@ -177,9 +177,6 @@ void CCPortLatency::reset(){ m_seq_error=0; m_length_error=0; m_no_ipv4_option=0; - for (int i = 0; i < MAX_FLOW_STATS; i++) { - m_rx_pg_stat[i].clear(); - } m_hist.Reset(); } @@ -568,7 +565,6 @@ bool CLatencyManager::Create(CLatencyManagerCfg * cfg){ return (true); } - void CLatencyManager::send_pkt_all_ports(){ m_start_time = os_get_hr_tick_64(); int i; diff --git a/src/latency.h b/src/latency.h index cfa523e4..eef7146a 100644 --- a/src/latency.h +++ b/src/latency.h @@ -246,12 +246,8 @@ public: uint64_t m_rx_check; uint64_t m_no_ipv4_option; uint64_t m_length_error; - rx_per_flow_t m_rx_pg_stat[MAX_FLOW_STATS]; - rx_per_flow_t m_rx_pg_payload_stats[MAX_FLOW_STATS_PAYLOAD]; - CTimeHistogram m_per_flow_hist[MAX_FLOW_STATS_PAYLOAD]; /* all window */ - CJitter m_per_flow_jitter[MAX_FLOW_STATS_PAYLOAD]; CTimeHistogram m_hist; /* all window */ - CJitter m_jitter; + CJitter m_jitter; }; diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 669389a2..8b33ead8 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -291,7 +291,7 @@ public: virtual int get_rx_stat_capabilities() { return TrexPlatformApi::IF_STAT_IPV4_ID | TrexPlatformApi::IF_STAT_RX_BYTES_COUNT | TrexPlatformApi::IF_STAT_PAYLOAD; - } + } virtual CFlowStatParser *get_flow_stat_parser(); }; @@ -1733,7 +1733,7 @@ public: virtual int close_file(void){ return (flush_tx_queue()); } - + __attribute__ ((noinline)) void send_node_flow_stat(CGenNode * node); virtual int send_node(CGenNode * node); virtual void send_one_pkt(pkt_dir_t dir, rte_mbuf_t *m); @@ -1784,6 +1784,7 @@ protected: class CCoreEthIFStateless : public CCoreEthIF { public: + virtual int send_node_flow_stat(CGenNodeStateless * node_sl); virtual int send_node(CGenNode * node); protected: int handle_slow_path_node(CGenNode *node); @@ -2004,7 +2005,65 @@ void CCoreEthIF::update_mac_addr(CGenNode * node,uint8_t *p){ } } +/// ??? need better implementation. Maybe implement as template of send_node. +// Maybe make it part of send_node somehow +int CCoreEthIFStateless::send_node_flow_stat(CGenNodeStateless * node_sl) { + uint16_t hw_id = node_sl->get_stat_hw_id(); + tx_per_flow_t *lp_s; + /* check that we have mbuf */ + rte_mbuf_t *temp_m = node_sl->get_cache_mbuf(); + rte_mbuf_t *m; + + if (temp_m) { + /* cache case */ + m = node_sl->alloc_flow_stat_mbuf(temp_m); + }else{ + temp_m = node_sl->alloc_node_with_vm(); + assert(temp_m); + m = node_sl->alloc_flow_stat_mbuf(temp_m); + rte_pktmbuf_free(temp_m); + } + + pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir(); + CCorePerPort * lp_port=&m_ports[dir]; + CVirtualIFPerSideStats * lp_stats = &m_stats[dir]; + + if (hw_id >= MAX_FLOW_STATS) { + // payload rule hw_ids are in the range right above ip id rules + uint16_t hw_id_payload = hw_id - MAX_FLOW_STATS; + if (hw_id_payload > max_stat_hw_id_seen_payload) { + max_stat_hw_id_seen_payload = hw_id_payload; + } + uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*); + struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *) + (p + m->pkt_len - sizeof(struct flow_stat_payload_header)); + fsp_head->seq = lp_stats->m_seq_num[hw_id_payload]; + fsp_head->time_stamp = os_get_hr_tick_64(); + lp_stats->m_seq_num[hw_id_payload]++; + // remove ??? +#if 0 + if (temp % 11 == 0) { + fsp_head->seq = lp_stats->m_seq_num[hw_id_payload]++; + } + + if ((temp -1) % 100 == 0) { + fsp_head->seq = lp_stats->m_seq_num[hw_id_payload] - 3; + lp_stats->m_seq_num[hw_id_payload]--; + } +#endif + } else { + // ip id rule + if (hw_id > max_stat_hw_id_seen) { + max_stat_hw_id_seen = hw_id; + } + } + lp_s = &lp_stats->m_tx_per_flow[hw_id]; + lp_s->add_pkts(1); + lp_s->add_bytes(m->pkt_len); + send_pkt(lp_port,m,lp_stats); + return 0; +} int CCoreEthIFStateless::send_node(CGenNode * no) { @@ -2013,8 +2072,12 @@ int CCoreEthIFStateless::send_node(CGenNode * no) { return handle_slow_path_node(no); } - CGenNodeStateless * node_sl=(CGenNodeStateless *) no; + + if (unlikely(node_sl->is_stat_needed())) { + return send_node_flow_stat(node_sl); + } + /* check that we have mbuf */ rte_mbuf_t * m; @@ -2035,27 +2098,6 @@ int CCoreEthIFStateless::send_node(CGenNode * no) { } } - if (unlikely(node_sl->is_stat_needed())) { - uint16_t hw_id = node_sl->get_stat_hw_id(); - tx_per_flow_t *lp_s; - if (hw_id >= MAX_FLOW_STATS) { - // payload rule - // payload rule hw_ids are in the range right above ip id rules - uint16_t hw_id_payload = hw_id - MAX_FLOW_STATS; - if (hw_id_payload > max_stat_hw_id_seen_payload) { - max_stat_hw_id_seen_payload = hw_id_payload; - } - //??? add seq num (m_seq_num[..], timestamp - } else { - // ip id rule - if (hw_id > max_stat_hw_id_seen) { - max_stat_hw_id_seen = hw_id; - } - } - lp_s = &lp_stats->m_tx_per_flow[hw_id]; - lp_s->add_pkts(1); - lp_s->add_bytes(m->pkt_len); - } send_pkt(lp_port,m,lp_stats); return (0); @@ -4162,14 +4204,13 @@ int CPhyEthIF::get_flow_stats(rx_per_flow_t *rx_stats, tx_per_flow_t *tx_stats, uint32_t diff_bytes[MAX_FLOW_STATS]; bool hw_rx_stat_supported = get_ex_drv()->hw_rx_stat_supported(); - // ???? if 40G, but payload rules, need to read from software if (hw_rx_stat_supported) { if (get_ex_drv()->get_rx_stats(this, diff_pkts, m_stats.m_fdir_prev_pkts , diff_bytes, m_stats.m_fdir_prev_bytes, min, max) < 0) { return -1; } } else { - g_trex.m_rx_sl.get_rx_stats(get_port_id(), rx_stats, min, max, reset); + g_trex.m_rx_sl.get_rx_stats(get_port_id(), rx_stats, min, max, reset, TrexPlatformApi::IF_STAT_IPV4_ID); } for (int i = min; i <= max; i++) { @@ -4206,6 +4247,23 @@ int CPhyEthIF::get_flow_stats(rx_per_flow_t *rx_stats, tx_per_flow_t *tx_stats, return 0; } +int CPhyEthIF::get_flow_stats_payload(rx_per_flow_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset) { + g_trex.m_rx_sl.get_rx_stats(get_port_id(), rx_stats, min, max, reset, TrexPlatformApi::IF_STAT_PAYLOAD); + for (int i = min; i <= max; i++) { + if ( reset ) { + if (tx_stats != NULL) { + tx_stats[i - min] = g_trex.clear_flow_tx_stats(m_port_id, i + MAX_FLOW_STATS); + } + } else { + if (tx_stats != NULL) { + tx_stats[i - min] = g_trex.get_flow_tx_stats(m_port_id, i + MAX_FLOW_STATS); + } + } + } + + return 0; +} + // If needed, send packets to rx core for processing. // This is relevant only in VM case, where we receive packets to the working DP core (only 1 DP core in this case) bool CCoreEthIF::process_rx_pkt(pkt_dir_t dir, @@ -5251,12 +5309,17 @@ int CTRexExtendedDriverBase40G::configure_rx_filter_rules_statfull(CPhyEthIF * _ return 0; } -const uint32_t TEMP_FDIR_HW_ID = 511; +const uint32_t FDIR_TEMP_HW_ID = 511; +const uint32_t FDIR_PAYLOAD_RULES_HW_ID = 510; +extern const uint16_t FLOW_STAT_PAYLOAD_IP_ID; int CTRexExtendedDriverBase40G::configure_rx_filter_rules(CPhyEthIF * _if) { if (get_is_stateless()) { - //??? if we add here one rule for IP/TCP/OTHER, it lowers our IP_ID support to 127 rules - rte_eth_fdir_stats_reset(_if->get_port_id(), NULL, TEMP_FDIR_HW_ID, 1); - return 0; // Rules are configured dynamically in stateless + uint32_t port_id = _if->get_port_id(); + add_del_rules(RTE_ETH_FILTER_ADD, port_id, RTE_ETH_FLOW_NONFRAG_IPV4_TCP, 0, FLOW_STAT_PAYLOAD_IP_ID, MAIN_DPDK_RX_Q, FDIR_PAYLOAD_RULES_HW_ID); + add_del_rules(RTE_ETH_FILTER_ADD, port_id, RTE_ETH_FLOW_NONFRAG_IPV4_UDP, 0, FLOW_STAT_PAYLOAD_IP_ID, MAIN_DPDK_RX_Q, FDIR_PAYLOAD_RULES_HW_ID); + add_del_rules(RTE_ETH_FILTER_ADD, port_id, RTE_ETH_FLOW_NONFRAG_IPV4_OTHER, 0, FLOW_STAT_PAYLOAD_IP_ID, MAIN_DPDK_RX_Q, FDIR_PAYLOAD_RULES_HW_ID); + rte_eth_fdir_stats_reset(_if->get_port_id(), NULL, FDIR_TEMP_HW_ID, 1); + return 0; // Other rules are configured dynamically in stateless } else { return configure_rx_filter_rules_statfull(_if); } @@ -5303,12 +5366,12 @@ int CTRexExtendedDriverBase40G::get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, ui uint32_t counter, temp_count; uint32_t hw_id = start - min + i; - add_del_rules( RTE_ETH_FILTER_ADD, port_id, fdir_hw_id_rule_type[hw_id], 0, IP_ID_RESERVE_BASE + i, MAIN_DPDK_DATA_Q, TEMP_FDIR_HW_ID); + add_del_rules( RTE_ETH_FILTER_ADD, port_id, fdir_hw_id_rule_type[hw_id], 0, IP_ID_RESERVE_BASE + i, MAIN_DPDK_DATA_Q, FDIR_TEMP_HW_ID); delay(100); rte_eth_fdir_stats_reset(port_id, &counter, hw_id, 1); add_del_rules( RTE_ETH_FILTER_ADD, port_id, fdir_hw_id_rule_type[hw_id], 0, IP_ID_RESERVE_BASE + i, MAIN_DPDK_DATA_Q, hw_id); delay(100); - rte_eth_fdir_stats_reset(port_id, &temp_count, TEMP_FDIR_HW_ID, 1); + rte_eth_fdir_stats_reset(port_id, &temp_count, FDIR_TEMP_HW_ID, 1); pkts[i] = counter + temp_count - prev_pkts[i]; prev_pkts[i] = 0; } else { @@ -5593,8 +5656,19 @@ TrexDpdkPlatformApi::get_interface_stat_info(uint8_t interface_id, uint16_t &num capabilities = CTRexExtendedDriverDb::Ins()->get_drv()->get_rx_stat_capabilities(); } -int TrexDpdkPlatformApi::get_flow_stats(uint8 port_id, void *rx_stats, void *tx_stats, int min, int max, bool reset) const { - return g_trex.m_ports[port_id].get_flow_stats((rx_per_flow_t *)rx_stats, (tx_per_flow_t *)tx_stats, min, max, reset); +int TrexDpdkPlatformApi::get_flow_stats(uint8 port_id, void *rx_stats, void *tx_stats, int min, int max, bool reset + , TrexPlatformApi::driver_stat_cap_e type) const { + if (type == TrexPlatformApi::IF_STAT_PAYLOAD) { + return g_trex.m_ports[port_id].get_flow_stats_payload((rx_per_flow_t *)rx_stats, (tx_per_flow_t *)tx_stats + , min, max, reset); + } else { + return g_trex.m_ports[port_id].get_flow_stats((rx_per_flow_t *)rx_stats, (tx_per_flow_t *)tx_stats + , min, max, reset); + } +} + +int TrexDpdkPlatformApi::get_rfc2544_info(void *rfc2544_info, int min, int max, bool reset) const { + return g_trex.m_rx_sl.get_rfc2544_info((rfc2544_info_t *)rfc2544_info, min, max, reset); } int TrexDpdkPlatformApi::reset_hw_flow_stats(uint8_t port_id) const { diff --git a/src/main_dpdk.h b/src/main_dpdk.h index a9bfed39..c2169eea 100644 --- a/src/main_dpdk.h +++ b/src/main_dpdk.h @@ -76,6 +76,7 @@ class CPhyEthIF { int dump_fdir_global_stats(FILE *fd); int reset_hw_flow_stats(); int get_flow_stats(rx_per_flow_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset); + int get_flow_stats_payload(rx_per_flow_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset); void get_stats_1g(CPhyEthIFStats *stats); void rx_queue_setup(uint16_t rx_queue_id, uint16_t nb_rx_desc, diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h index a0daac9e..de45555a 100644 --- a/src/stateless/cp/trex_stream.h +++ b/src/stateless/cp/trex_stream.h @@ -551,7 +551,7 @@ public: bool m_enabled; bool m_seq_enabled; bool m_latency; - uint8_t m_rule_type; + uint16_t m_rule_type; uint32_t m_pg_id; uint16_t m_hw_id; } m_rx_check; diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 833fb6e1..9c05c16b 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -211,6 +211,18 @@ std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state) return(res); } +rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m) { + //????????? + // temp implementation. Just copy the entire mbuf + rte_mbuf_t *m_new = CGlobalInfo::pktmbuf_alloc( get_socket_id(), m->data_len ); + /* TBD remove this, should handle cases of error */ + assert(m_new); + char *p = rte_pktmbuf_mtod(m, char*); + char *p_new = rte_pktmbuf_append(m_new, m->data_len); + memcpy(p_new , p, m->data_len); + + return m_new; +} rte_mbuf_t * CGenNodeStateless::alloc_node_with_vm(){ diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h index b5395e78..4edf3a06 100644 --- a/src/stateless/dp/trex_stream_node.h +++ b/src/stateless/dp/trex_stream_node.h @@ -390,7 +390,7 @@ public: return (m_src_port); } - + rte_mbuf_t * alloc_flow_stat_mbuf(rte_mbuf_t *); //temp ??? rte_mbuf_t * alloc_node_with_vm(); void free_stl_node(); diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index f7658e53..1a5d9a7e 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -6,6 +6,13 @@ #include "trex_stateless_messaging.h" #include "trex_stateless_rx_core.h" +void CCPortLatencyStl::reset() { + for (int i = 0; i < MAX_FLOW_STATS; i++) { + m_rx_pg_stat[i].clear(); + m_rx_pg_stat_payload[i].clear(); + } +} + void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_max_ports = cfg.m_max_ports; @@ -16,10 +23,20 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_state = STATE_IDLE; for (int i = 0; i < m_max_ports; i++) { - CLatencyManagerPerPort * lp = &m_ports[i]; + CLatencyManagerPerPortStl * lp = &m_ports[i]; lp->m_io = cfg.m_ports[i]; + lp->m_port.reset(); } m_cpu_cp_u.Create(&m_cpu_dp_u); + + for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) { + m_per_flow_seq[i] = 0; + m_per_flow_hist[i].Reset(); + m_per_flow_jitter[i].reset(); + m_per_flow_seq_error[i] = 0; + m_per_flow_out_of_order[i] = 0; + m_per_flow_dup[i] = 0; + } } void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) { @@ -107,16 +124,54 @@ void CRxCoreStateless::start() { } } -void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPort *lp, rte_mbuf_t *m) { +void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t *m) { CFlowStatParser parser; if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) { uint16_t ip_id; if (parser.get_ip_id(ip_id) == 0) { if (is_flow_stat_id(ip_id)) { - uint16_t hw_id = get_hw_id(ip_id); - lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1); - lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len); + uint16_t hw_id; + if (is_flow_stat_payload_id(ip_id)) { + uint32_t seq; //??? handle seq wrap around + uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*); + struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *) + (p + m->pkt_len - sizeof(struct flow_stat_payload_header)); + if (fsp_head->magic == FLOW_STAT_PAYLOAD_MAGIC) { + hw_id = fsp_head->hw_id; + seq = fsp_head->seq; + if (unlikely(seq != m_per_flow_seq[hw_id])) { + if (seq < m_per_flow_seq[hw_id]) { + if (seq == (m_per_flow_seq[hw_id] - 1)) { + m_per_flow_dup[hw_id] += 1; + printf("dup packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]); + } else { + m_per_flow_out_of_order[hw_id] += 1; + // We thought it was lost, but it was just out of order + m_per_flow_seq_error[hw_id] -= 1; + printf("ooo packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]); + } + } else { + // seq > m_per_flow_seq[hw_id] + printf("lost packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]); + m_per_flow_seq_error[hw_id] += seq - m_per_flow_seq[hw_id]; + m_per_flow_seq[hw_id] = seq + 1; + } + } else { + m_per_flow_seq[hw_id] = seq + 1; + } + lp->m_port.m_rx_pg_stat_payload[hw_id].add_pkts(1); + lp->m_port.m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len); + uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp ); + dsec_t ctime = ptime_convert_hr_dsec(d); + m_per_flow_hist[hw_id].Add(ctime); + m_per_flow_jitter[hw_id].calc(ctime); + } + } else { + hw_id = get_hw_id(ip_id); + lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1); + lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len); + } } } } @@ -136,7 +191,7 @@ void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) { CGenNodeLatencyPktInfo * l_msg; uint8_t msg_type = msg->m_msg_type; uint8_t rx_port_index; - CLatencyManagerPerPort * lp; + CLatencyManagerPerPortStl * lp; switch (msg_type) { case CGenNodeMsgBase::LATENCY_PKT: @@ -176,7 +231,7 @@ void CRxCoreStateless::flush_rx() { rte_mbuf_t * rx_pkts[64]; int i, total_pkts = 0; for (i = 0; i < m_max_ports; i++) { - CLatencyManagerPerPort * lp = &m_ports[i]; + CLatencyManagerPerPortStl * lp = &m_ports[i]; rte_mbuf_t * m; /* try to read 64 packets clean up the queue */ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64); @@ -198,7 +253,7 @@ int CRxCoreStateless::try_rx() { rte_mbuf_t * rx_pkts[64]; int i, total_pkts = 0; for (i = 0; i < m_max_ports; i++) { - CLatencyManagerPerPort * lp = &m_ports[i]; + CLatencyManagerPerPortStl * lp = &m_ports[i]; rte_mbuf_t * m; /* try to read 64 packets clean up the queue */ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64); @@ -223,6 +278,11 @@ bool CRxCoreStateless::is_flow_stat_id(uint16_t id) { return false; } +bool CRxCoreStateless::is_flow_stat_payload_id(uint16_t id) { + if (id == FLOW_STAT_PAYLOAD_IP_ID) return true; + return false; +} + uint16_t CRxCoreStateless::get_hw_id(uint16_t id) { return (0x00ff & id); } @@ -233,11 +293,39 @@ void CRxCoreStateless::reset_rx_stats(uint8_t port_id) { } } -int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset) { +int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max + , bool reset, TrexPlatformApi::driver_stat_cap_e type) { + for (int hw_id = min; hw_id <= max; hw_id++) { + if (type == TrexPlatformApi::IF_STAT_PAYLOAD) { + rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id]; + } else { + rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id]; + } + if (reset) { + if (type == TrexPlatformApi::IF_STAT_PAYLOAD) { + m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id].clear(); + } else { + m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); + } + } + } + return 0; +} + +int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) { + std::string json; for (int hw_id = min; hw_id <= max; hw_id++) { - rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id]; + rfc2544_info[hw_id - min].set_err_cntrs(m_per_flow_seq_error[hw_id], m_per_flow_out_of_order[hw_id]); + rfc2544_info[hw_id - min].set_jitter(m_per_flow_jitter[hw_id].get_jitter()); + m_per_flow_hist[hw_id].update(); + m_per_flow_hist[hw_id].dump_json("", json); + rfc2544_info[hw_id - min].set_latency_json(json); + if (reset) { - m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); + m_per_flow_seq_error[hw_id] = 0; + m_per_flow_out_of_order[hw_id] = 0; + m_per_flow_hist[hw_id].Reset(); + m_per_flow_jitter[hw_id].reset(); } } return 0; diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index a372a578..d946f920 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -26,6 +26,21 @@ class TrexStatelessCpToRxMsgBase; +class CCPortLatencyStl { + public: + void reset(); + + public: + rx_per_flow_t m_rx_pg_stat[MAX_FLOW_STATS]; + rx_per_flow_t m_rx_pg_stat_payload[MAX_FLOW_STATS_PAYLOAD]; +}; + +class CLatencyManagerPerPortStl { +public: + CCPortLatencyStl m_port; + CPortLatencyHWBase * m_io; +}; + class CRxSlCfg { public: CRxSlCfg (){ @@ -50,7 +65,9 @@ class CRxCoreStateless { void start(); void create(const CRxSlCfg &cfg); void reset_rx_stats(uint8_t port_id); - int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset); + int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset + , TrexPlatformApi::driver_stat_cap_e type); + int get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset); void work() {m_state = STATE_WORKING;} void idle() {m_state = STATE_IDLE;} void quit() {m_state = STATE_QUIT;} @@ -64,18 +81,19 @@ class CRxCoreStateless { void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); bool periodic_check_for_cp_messages(); void idle_state_loop(); - void handle_rx_pkt(CLatencyManagerPerPort * lp, rte_mbuf_t * m); + void handle_rx_pkt(CLatencyManagerPerPortStl * lp, rte_mbuf_t * m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); void flush_rx(); int try_rx(); void try_rx_queues(); bool is_flow_stat_id(uint16_t id); + bool is_flow_stat_payload_id(uint16_t id); uint16_t get_hw_id(uint16_t id); private: uint32_t m_max_ports; bool m_has_streams; - CLatencyManagerPerPort m_ports[TREX_MAX_PORTS]; + CLatencyManagerPerPortStl m_ports[TREX_MAX_PORTS]; state_e m_state; CNodeRing *m_ring_from_cp; CNodeRing *m_ring_to_cp; @@ -83,6 +101,11 @@ class CRxCoreStateless { CCpuUtlCp m_cpu_cp_u; // Used for acking "work" (go out of idle) messages from cp volatile bool m_ack_start_work_msg __rte_cache_aligned; - + uint64_t m_per_flow_seq[MAX_FLOW_STATS_PAYLOAD]; // expected next seq num + CTimeHistogram m_per_flow_hist[MAX_FLOW_STATS_PAYLOAD]; /* latency info */ + CJitter m_per_flow_jitter[MAX_FLOW_STATS_PAYLOAD]; + uint64_t m_per_flow_seq_error[MAX_FLOW_STATS_PAYLOAD]; // How many packet seq num gaps we saw (packets lost or out of order) + uint64_t m_per_flow_out_of_order[MAX_FLOW_STATS_PAYLOAD]; // Packets we got with seq num lower than expected + uint64_t m_per_flow_dup[MAX_FLOW_STATS_PAYLOAD]; // Packets we got with same seq num }; #endif diff --git a/src/time_histogram.cpp b/src/time_histogram.cpp index 96796bfc..a6b98079 100755 --- a/src/time_histogram.cpp +++ b/src/time_histogram.cpp @@ -212,7 +212,10 @@ void CTimeHistogram::Dump(FILE *fd){ void CTimeHistogram::dump_json(std::string name,std::string & json ){ char buff[200]; - sprintf(buff,"\"%s\":{",name.c_str()); + if (name != "") + sprintf(buff,"\"%s\":{",name.c_str()); + else + sprintf(buff,"{"); json+=std::string(buff); json+=add_json("min_usec",get_usec(m_min_delta)); -- cgit 1.2.3-korg