From a53f6be0617721b535086298095ad49057a7be69 Mon Sep 17 00:00:00 2001 From: Ido Barnea Date: Sun, 24 Apr 2016 14:43:28 +0300 Subject: Working version. temporary send_node that duplicates the mbuf data --- src/flow_stat.cpp | 191 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 132 insertions(+), 59 deletions(-) (limited to 'src/flow_stat.cpp') diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp index 5640b054..52d8129c 100644 --- a/src/flow_stat.cpp +++ b/src/flow_stat.cpp @@ -63,6 +63,7 @@ stream_del: HW_ID_INIT static const uint16_t HW_ID_INIT = UINT16_MAX; static const uint16_t HW_ID_FREE = UINT16_MAX - 1; static const uint8_t PAYLOAD_RULE_PROTO = 255; +const uint16_t FLOW_STAT_PAYLOAD_IP_ID = IP_ID_RESERVE_BASE + MAX_FLOW_STATS; inline std::string methodName(const std::string& prettyFunction) { @@ -87,10 +88,10 @@ inline std::string methodName(const std::string& prettyFunction) /************** class CFlowStatUserIdInfo ***************/ CFlowStatUserIdInfo::CFlowStatUserIdInfo(uint8_t proto) { - memset(m_rx_counter, 0, sizeof(m_rx_counter)); - memset(m_rx_counter_base, 0, sizeof(m_rx_counter)); - memset(m_tx_counter, 0, sizeof(m_tx_counter)); - memset(m_tx_counter_base, 0, sizeof(m_tx_counter)); + memset(m_rx_cntr, 0, sizeof(m_rx_cntr)); + memset(m_rx_cntr_base, 0, sizeof(m_rx_cntr)); + memset(m_tx_cntr, 0, sizeof(m_tx_cntr)); + memset(m_tx_cntr_base, 0, sizeof(m_tx_cntr)); m_hw_id = UINT16_MAX; m_proto = proto; m_ref_count = 1; @@ -100,34 +101,35 @@ CFlowStatUserIdInfo::CFlowStatUserIdInfo(uint8_t proto) { m_rx_changed[i] = false; m_tx_changed[i] = false; } + m_rfc2544_support = false; } std::ostream& operator<<(std::ostream& os, const CFlowStatUserIdInfo& cf) { os << "hw_id:" << cf.m_hw_id << " proto:" << (uint16_t) cf.m_proto << " ref(" << (uint16_t) cf.m_ref_count << "," << (uint16_t) cf.m_trans_ref_count << ")"; os << " rx count ("; - os << cf.m_rx_counter[0]; + os << cf.m_rx_cntr[0]; for (int i = 1; i < TREX_MAX_PORTS; i++) { - os << "," << cf.m_rx_counter[i]; + os << "," << cf.m_rx_cntr[i]; } os << ")"; os << " rx count base("; - os << cf.m_rx_counter_base[0]; + os << cf.m_rx_cntr_base[0]; for (int i = 1; i < TREX_MAX_PORTS; i++) { - os << "," << cf.m_rx_counter_base[i]; + os << "," << cf.m_rx_cntr_base[i]; } os << ")"; os << " tx count ("; - os << cf.m_tx_counter[0]; + os << cf.m_tx_cntr[0]; for (int i = 1; i < TREX_MAX_PORTS; i++) { - os << "," << cf.m_tx_counter[i]; + os << "," << cf.m_tx_cntr[i]; } os << ")"; os << " tx count base("; - os << cf.m_tx_counter_base[0]; + os << cf.m_tx_cntr_base[0]; for (int i = 1; i < TREX_MAX_PORTS; i++) { - os << "," << cf.m_tx_counter_base[i]; + os << "," << cf.m_tx_cntr_base[i]; } os << ")"; @@ -153,10 +155,10 @@ void CFlowStatUserIdInfo::reset_hw_id() { // we are not attached to hw. Save packet count of session. // Next session will start counting from 0. for (int i = 0; i < TREX_MAX_PORTS; i++) { - m_rx_counter_base[i] += m_rx_counter[i]; - memset(&m_rx_counter[i], 0, sizeof(m_rx_counter[0])); - m_tx_counter_base[i] += m_tx_counter[i]; - memset(&m_tx_counter[i], 0, sizeof(m_tx_counter[0])); + m_rx_cntr_base[i] += m_rx_cntr[i]; + memset(&m_rx_cntr[i], 0, sizeof(m_rx_cntr[0])); + m_tx_cntr_base[i] += m_tx_cntr[i]; + memset(&m_tx_cntr[i], 0, sizeof(m_tx_cntr[0])); } } @@ -215,9 +217,9 @@ CFlowStatUserIdMap::add_user_id(uint32_t user_id, uint8_t proto) { CFlowStatUserIdInfo *new_id = new CFlowStatUserIdInfo(proto); if (proto == PAYLOAD_RULE_PROTO) { - new_id = new CFlowStatUserIdInfo(proto); - } else { new_id = new CFlowStatUserIdInfoPayload(proto); + } else { + new_id = new CFlowStatUserIdInfo(proto); } if (new_id != NULL) { std::pair ret; @@ -368,15 +370,21 @@ uint16_t CFlowStatUserIdMap::unmap(uint32_t user_id) { return UINT16_MAX; } uint16_t old_hw_id = c_user_id->get_hw_id(); - c_user_id->reset_hw_id(); + c_user_id->reset_hw_id();//??? need to call reset of CFlowStatUserIdInfoPayload if needed return old_hw_id; } /************** class CFlowStatHwIdMap ***************/ CFlowStatHwIdMap::CFlowStatHwIdMap() { - m_num_free = MAX_FLOW_STATS; - for (int i = 0; i < MAX_FLOW_STATS; i++) { + m_map = NULL; +} + +void CFlowStatHwIdMap::create(uint16_t size) { + m_map = new uint32_t[size]; + assert (m_map != NULL); + m_num_free = size; + for (int i = 0; i < size; i++) { m_map[i] = HW_ID_FREE; } } @@ -434,9 +442,11 @@ CFlowStatRuleMgr::CFlowStatRuleMgr() { m_max_hw_id_payload = -1; m_num_started_streams = 0; m_ring_to_rx = NULL; - m_capabilities = 0; + m_cap = 0; m_parser = NULL; m_rx_core = NULL; + m_hw_id_map.create(MAX_FLOW_STATS); + m_hw_id_map_payload.create(MAX_FLOW_STATS_PAYLOAD); } CFlowStatRuleMgr::~CFlowStatRuleMgr() { @@ -445,13 +455,13 @@ CFlowStatRuleMgr::~CFlowStatRuleMgr() { } void CFlowStatRuleMgr::create() { - uint16_t num_counters, capabilities; + uint16_t num_counters, cap; TrexStateless *tstateless = get_stateless_obj(); assert(tstateless); m_api = tstateless->get_platform_api(); assert(m_api); - m_api->get_interface_stat_info(0, num_counters, capabilities); + m_api->get_interface_stat_info(0, num_counters, cap); m_api->get_port_num(m_num_ports); for (uint8_t port = 0; port < m_num_ports; port++) { assert(m_api->reset_hw_flow_stats(port) == 0); @@ -461,7 +471,7 @@ void CFlowStatRuleMgr::create() { m_rx_core = get_rx_sl_core_obj(); m_parser = m_api->get_flow_stat_parser(); assert(m_parser); - m_capabilities = capabilities; + m_cap = cap; } std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) { @@ -527,9 +537,9 @@ int CFlowStatRuleMgr::add_stream(TrexStream * stream) { //??? put back assert(stream->m_rx_check.m_hw_id == HW_ID_INIT); - uint16_t rule_type = stream->m_rx_check.m_rule_type; + TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type; - if ((m_capabilities & rule_type) == 0) { + if ((m_cap & rule_type) == 0) { throw TrexFStatEx("Interface does not support given rule type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE_FOR_IF); } @@ -580,7 +590,7 @@ int CFlowStatRuleMgr::del_stream(TrexStream * stream) { if (! m_api) throw TrexFStatEx("Called del_stream, but no stream was added", TrexException::T_FLOW_STAT_NO_STREAMS_EXIST); - uint16_t rule_type = stream->m_rx_check.m_rule_type; + TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type; switch(rule_type) { case TrexPlatformApi::IF_STAT_IPV4_ID: break; @@ -661,9 +671,9 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream) { , TrexException::T_FLOW_STAT_ALREADY_STARTED); } - uint16_t rule_type = stream->m_rx_check.m_rule_type; + TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type; - if ((m_capabilities & rule_type) == 0) { + if ((m_cap & rule_type) == 0) { throw TrexFStatEx("Interface does not support given rule type", TrexException::T_FLOW_STAT_BAD_RULE_TYPE_FOR_IF); } @@ -711,20 +721,28 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream) { m_hw_id_map_payload.map(hw_id, user_id); } // clear hardware counters. Just in case we have garbage from previous iteration - rx_per_flow_t rx_counter; - tx_per_flow_t tx_counter; + rx_per_flow_t rx_cntr; + tx_per_flow_t tx_cntr; + rfc2544_info_t rfc2544_info; for (uint8_t port = 0; port < m_num_ports; port++) { - m_api->get_flow_stats(port, &rx_counter, (void *)&tx_counter, hw_id, hw_id, true); + m_api->get_flow_stats(port, &rx_cntr, (void *)&tx_cntr, hw_id, hw_id, true, rule_type); + } + if (rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) { + m_api->get_rfc2544_info(&rfc2544_info, hw_id, hw_id, true); } } } - m_parser->set_ip_id(IP_ID_RESERVE_BASE + hw_id); - // saving given hw_id on stream for use by tx statistics count if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) { + m_parser->set_ip_id(IP_ID_RESERVE_BASE + hw_id); stream->m_rx_check.m_hw_id = hw_id; } else { + struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *) + (stream->m_pkt.binary + stream->m_pkt.len - sizeof(struct flow_stat_payload_header)); + fsp_head->hw_id = hw_id; + fsp_head->magic = FLOW_STAT_PAYLOAD_MAGIC; + m_parser->set_ip_id(FLOW_STAT_PAYLOAD_IP_ID); // for payload rules, we use the range right after ip id rules stream->m_rx_check.m_hw_id = hw_id + MAX_FLOW_STATS; } @@ -783,7 +801,7 @@ int CFlowStatRuleMgr::stop_stream(TrexStream * stream) { return 0; } - uint16_t rule_type = stream->m_rx_check.m_rule_type; + TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type; switch(rule_type) { case TrexPlatformApi::IF_STAT_IPV4_ID: break; @@ -811,28 +829,37 @@ int CFlowStatRuleMgr::stop_stream(TrexStream * stream) { p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(hw_id)); } else { p_user_id = m_user_id_map.find_user_id(m_hw_id_map_payload.get_user_id(hw_id)); - hw_id += MAX_FLOW_STATS; } assert(p_user_id != NULL); - rx_per_flow_t rx_counter; - tx_per_flow_t tx_counter; + rx_per_flow_t rx_cntr; + tx_per_flow_t tx_cntr; + rfc2544_info_t rfc2544_info; for (uint8_t port = 0; port < m_num_ports; port++) { if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) { m_api->del_rx_flow_stat_rule(port, FLOW_STAT_RULE_TYPE_IPV4_ID, proto, hw_id); } - m_api->get_flow_stats(port, &rx_counter, (void *)&tx_counter, hw_id, hw_id, true); + m_api->get_flow_stats(port, &rx_cntr, (void *)&tx_cntr, hw_id, hw_id, true, rule_type); // when stopping, always send counters for stopped stream one last time - p_user_id->set_rx_counter(port, rx_counter); + p_user_id->set_rx_cntr(port, rx_cntr); p_user_id->set_need_to_send_rx(port); - p_user_id->set_tx_counter(port, tx_counter); + p_user_id->set_tx_cntr(port, tx_cntr); p_user_id->set_need_to_send_tx(port); } - m_user_id_map.unmap(stream->m_rx_check.m_pg_id); + if (rule_type == TrexPlatformApi::IF_STAT_IPV4_ID) { m_hw_id_map.unmap(hw_id); } else { + CFlowStatUserIdInfoPayload *p_user_id_p = (CFlowStatUserIdInfoPayload *)p_user_id; + std::string json; + m_api->get_rfc2544_info(&rfc2544_info, hw_id, hw_id, true); + p_user_id_p->set_jitter(rfc2544_info.get_jitter()); + rfc2544_info.get_latency_json(json); + p_user_id_p->set_latency_json(json); + p_user_id_p->set_seq_err_cnt(rfc2544_info.get_seq_err_cnt()); + p_user_id_p->set_ooo_cnt(rfc2544_info.get_ooo_cnt()); m_hw_id_map_payload.unmap(hw_id); } + m_user_id_map.unmap(stream->m_rx_check.m_pg_id); } } m_num_started_streams--; @@ -871,6 +898,7 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { rx_per_flow_t rx_stats_payload[MAX_FLOW_STATS]; tx_per_flow_t tx_stats[MAX_FLOW_STATS]; tx_per_flow_t tx_stats_payload[MAX_FLOW_STATS_PAYLOAD]; + rfc2544_info_t rfc2544_info[MAX_FLOW_STATS_PAYLOAD]; Json::FastWriter writer; Json::Value root; @@ -890,18 +918,18 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { return true; } + m_api->get_rfc2544_info(rfc2544_info, 0, m_max_hw_id_payload, false); + // read hw counters, and update for (uint8_t port = 0; port < m_num_ports; port++) { - m_api->get_flow_stats(port, rx_stats, (void *)tx_stats, 0, m_max_hw_id, false); - m_api->get_flow_stats(port, rx_stats_payload, (void *)tx_stats_payload, MAX_FLOW_STATS, MAX_FLOW_STATS + m_max_hw_id_payload, false); + m_api->get_flow_stats(port, rx_stats, (void *)tx_stats, 0, m_max_hw_id, false, TrexPlatformApi::IF_STAT_IPV4_ID); for (int i = 0; i <= m_max_hw_id; i++) { - //??? add rx for payload rules if (rx_stats[i].get_pkts() != 0) { rx_per_flow_t rx_pkts = rx_stats[i]; CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(i)); if (likely(p_user_id != NULL)) { - if (p_user_id->get_rx_counter(port) != rx_pkts) { - p_user_id->set_rx_counter(port, rx_pkts); + if (p_user_id->get_rx_cntr(port) != rx_pkts) { + p_user_id->set_rx_cntr(port, rx_pkts); p_user_id->set_need_to_send_rx(port); } } else { @@ -913,8 +941,8 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { tx_per_flow_t tx_pkts = tx_stats[i]; CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(i)); if (likely(p_user_id != NULL)) { - if (p_user_id->get_tx_counter(port) != tx_pkts) { - p_user_id->set_tx_counter(port, tx_pkts); + if (p_user_id->get_tx_cntr(port) != tx_pkts) { + p_user_id->set_tx_cntr(port, tx_pkts); p_user_id->set_need_to_send_tx(port); } } else { @@ -924,13 +952,28 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { } } // payload rules + m_api->get_flow_stats(port, rx_stats_payload, (void *)tx_stats_payload, 0, m_max_hw_id_payload + , false, TrexPlatformApi::IF_STAT_PAYLOAD); for (int i = 0; i <= m_max_hw_id_payload; i++) { + if (rx_stats_payload[i].get_pkts() != 0) { + rx_per_flow_t rx_pkts = rx_stats_payload[i]; + CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map_payload.get_user_id(i)); + if (likely(p_user_id != NULL)) { + if (p_user_id->get_rx_cntr(port) != rx_pkts) { + p_user_id->set_rx_cntr(port, rx_pkts); + p_user_id->set_need_to_send_rx(port); + } + } else { + std::cerr << __METHOD_NAME__ << i << ":Could not count " << rx_pkts << " rx payload packets, on port " + << (uint16_t)port << ", because no mapping was found." << std::endl; + } + } if (tx_stats_payload[i].get_pkts() != 0) { tx_per_flow_t tx_pkts = tx_stats_payload[i]; CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map_payload.get_user_id(i)); if (likely(p_user_id != NULL)) { - if (p_user_id->get_tx_counter(port) != tx_pkts) { - p_user_id->set_tx_counter(port, tx_pkts); + if (p_user_id->get_tx_cntr(port) != tx_pkts) { + p_user_id->set_tx_cntr(port, tx_pkts); p_user_id->set_need_to_send_tx(port); } } else { @@ -947,8 +990,7 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { bool send_empty = true; CFlowStatUserIdInfo *user_id_info = it->second; uint32_t user_id = it->first; - std::string str_user_id = static_cast( &(std::ostringstream() - << user_id) )->str(); + std::string str_user_id = static_cast( &(std::ostringstream() << user_id) )->str(); if (! user_id_info->was_sent()) { data_section[str_user_id]["first_time"] = true; user_id_info->set_was_sent(true); @@ -958,18 +1000,49 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { std::string str_port = static_cast( &(std::ostringstream() << int(port) ) )->str(); if (user_id_info->need_to_send_rx(port) || baseline) { user_id_info->set_no_need_to_send_rx(port); - data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_counter(port).get_pkts()); - if (m_capabilities & TrexPlatformApi::IF_STAT_RX_BYTES_COUNT) - data_section[str_user_id]["rx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_rx_counter(port).get_bytes()); + data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_cntr(port).get_pkts()); + if (m_cap & TrexPlatformApi::IF_STAT_RX_BYTES_COUNT) + //???put back data_section[str_user_id]["rx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_rx_cntr(port).get_bytes()); send_empty = false; } if (user_id_info->need_to_send_tx(port) || baseline) { user_id_info->set_no_need_to_send_tx(port); - data_section[str_user_id]["tx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_tx_counter(port).get_pkts()); - data_section[str_user_id]["tx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_tx_counter(port).get_bytes()); + data_section[str_user_id]["tx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_tx_cntr(port).get_pkts()); + //??? pub back data_section[str_user_id]["tx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_tx_cntr(port).get_bytes()); send_empty = false; } } + if (user_id_info->rfc2544_support()) { + CFlowStatUserIdInfoPayload *user_id_info_p = (CFlowStatUserIdInfoPayload *)user_id_info; + // payload object. Send also latency, jitter... + std::string json; + if (user_id_info->is_hw_id()) { + // if mapped to hw_id, take info from what we just got from rx core + uint16_t hw_id = user_id_info->get_hw_id(); + rfc2544_info[hw_id].get_latency_json(json); + user_id_info_p->set_seq_err_cnt(rfc2544_info[hw_id].get_seq_err_cnt()); + user_id_info_p->set_ooo_cnt(rfc2544_info[hw_id].get_ooo_cnt()); + data_section[str_user_id]["rfc2544"][""] = json; + data_section[str_user_id]["rfc2544"]["jitter"] = rfc2544_info[hw_id].get_jitter(); + } else { + // Not mapped to hw_id. Get saved info. + user_id_info_p->get_latency_json(json); + data_section[str_user_id]["rfc2544"]["latency"] = json; + data_section[str_user_id]["rfc2544"]["jitter"] = user_id_info_p->get_jitter(); + } + data_section[str_user_id]["rfc2544"]["err_cntrs"]["lost"] + = Json::Value::UInt64(user_id_info_p->get_seq_err_cnt()); + data_section[str_user_id]["rfc2544"]["err_cntrs"]["out_of_order"] + = Json::Value::UInt64(user_id_info_p->get_ooo_cnt()); + + //??? temp - remove + data_section[str_user_id]["tx_bytes"]["0"] + = Json::Value::UInt64(user_id_info_p->get_seq_err_cnt()); + data_section[str_user_id]["tx_bytes"]["1"] = 0; + data_section[str_user_id]["rx_bytes"]["0"] + = Json::Value::UInt64(user_id_info_p->get_ooo_cnt()); + data_section[str_user_id]["rx_bytes"]["1"] = 0; + } if (send_empty) { data_section[str_user_id] = Json::objectValue; } -- cgit 1.2.3-korg