summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py2
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py48
-rwxr-xr-xsrc/bp_sim.h5
-rw-r--r--src/flow_stat.cpp82
-rw-r--r--src/flow_stat.h18
-rw-r--r--src/gtest/trex_stateless_gtest.cpp20
-rw-r--r--src/internal_api/trex_platform_api.h2
-rw-r--r--src/main_dpdk.cpp117
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_stream.cpp8
-rw-r--r--src/stateless/cp/trex_stream.cpp1
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp3
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp42
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h2
13 files changed, 251 insertions, 99 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
index eace5cf2..e30da00e 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
@@ -942,7 +942,7 @@ class CRxStats(CTRexStats):
def calculate_bw_for_pg (self, pg_current, pg_prev = None, diff_sec = 0.0):
# no previous values
- if (pg_prev == None) or not (diff_sec > 0):
+ if (not pg_prev) or not (diff_sec > 0):
pg_current['tx_pps'] = {}
pg_current['tx_bps'] = {}
pg_current['tx_bps_L1'] = {}
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py
index a7fd3026..6264c17f 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py
@@ -210,25 +210,11 @@ STLStreamDstMAC_CFG_FILE=0
STLStreamDstMAC_PKT =1
STLStreamDstMAC_ARP =2
-# RX stats class
-class STLFlowStats(object):
- """ Define per stream stats
-
- .. code-block:: python
- :caption: STLFlowStats Example
-
- flow_stats = STLFlowStats(pg_id = 7)
-
- """
-
+class STLFlowStatsInterface(object):
def __init__ (self, pg_id):
self.fields = {}
-
self.fields['enabled'] = True
self.fields['stream_id'] = pg_id
- self.fields['seq_enabled'] = False
- self.fields['latency_enabled'] = False
-
def to_json (self):
""" Dump as json"""
@@ -238,6 +224,37 @@ class STLFlowStats(object):
def defaults ():
return {'enabled' : False}
+
+class STLFlowStats(STLFlowStatsInterface):
+ """ Define per stream basic stats
+
+ .. code-block:: python
+ :caption: STLFlowStats Example
+
+ flow_stats = STLFlowStats(pg_id = 7)
+
+ """
+
+ def __init__(self, pg_id):
+ super(STLFlowStats, self).__init__(pg_id)
+ self.fields['rule_type'] = 'stats'
+
+
+class STLFlowLatencyStats(STLFlowStatsInterface):
+ """ Define per stream basic stats + latency, jitter, packet reorder/loss
+
+ .. code-block:: python
+ :caption: STLFlowLatencyStats Example
+
+ flow_stats = STLFlowLatencyStats(pg_id = 7)
+
+ """
+
+ def __init__(self, pg_id):
+ super(STLFlowLatencyStats, self).__init__(pg_id)
+ self.fields['rule_type'] = 'latency'
+
+
class STLStream(object):
""" One stream object. Includes mode, Field Engine mode packet template and Rx stats
@@ -320,6 +337,7 @@ class STLStream(object):
# type checking
validate_type('mode', mode, STLTXMode)
validate_type('packet', packet, (type(None), CTrexPktBuilderInterface))
+ validate_type('flow_stats', flow_stats, (type(None), STLFlowStatsInterface))
validate_type('enabled', enabled, bool)
validate_type('self_start', self_start, bool)
validate_type('isg', isg, (int, float))
diff --git a/src/bp_sim.h b/src/bp_sim.h
index 97e14158..bb7dd928 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -274,7 +274,7 @@ public:
uint64_t m_tx_queue_full;
uint64_t m_tx_alloc_error;
tx_per_flow_t m_tx_per_flow[MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD];
- uint64_t m_seq_num[MAX_FLOW_STATS_PAYLOAD]; // seq num to put in packet for payload rules
+ uint32_t m_seq_num[MAX_FLOW_STATS_PAYLOAD]; // seq num to put in packet for payload rules
CPerTxthreadTemplateInfo m_template;
public:
@@ -297,6 +297,9 @@ public:
m_tx_alloc_error=0;
m_tx_queue_full=0;
m_template.Clear();
+ for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
+ m_seq_num[i] = UINT32_MAX - 1; // catch wrap around issues early
+ }
}
inline void Dump(FILE *fd);
diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp
index 52d8129c..d63dc778 100644
--- a/src/flow_stat.cpp
+++ b/src/flow_stat.cpp
@@ -164,11 +164,18 @@ void CFlowStatUserIdInfo::reset_hw_id() {
/************** class CFlowStatUserIdInfoPayload ***************/
void CFlowStatUserIdInfoPayload::add_stream(uint8_t proto) {
- //??? add unit test
throw TrexFStatEx("Can't have two streams with same packet group id for payload rules"
, TrexException::T_FLOW_STAT_DUP_PG_ID);
}
+void CFlowStatUserIdInfoPayload::reset_hw_id() {
+ CFlowStatUserIdInfo::reset_hw_id();
+
+ m_seq_error_base += m_rfc2544_info.m_seq_error;
+ m_out_of_order_base += m_rfc2544_info.m_out_of_order;
+ m_rfc2544_info.m_seq_error = 0;
+ m_rfc2544_info.m_out_of_order = 0;
+}
/************** class CFlowStatUserIdMap ***************/
CFlowStatUserIdMap::CFlowStatUserIdMap() {
@@ -370,7 +377,7 @@ uint16_t CFlowStatUserIdMap::unmap(uint32_t user_id) {
return UINT16_MAX;
}
uint16_t old_hw_id = c_user_id->get_hw_id();
- c_user_id->reset_hw_id();//??? need to call reset of CFlowStatUserIdInfoPayload if needed
+ c_user_id->reset_hw_id();
return old_hw_id;
}
@@ -535,8 +542,6 @@ int CFlowStatRuleMgr::add_stream(TrexStream * stream) {
create();
}
- //??? put back assert(stream->m_rx_check.m_hw_id == HW_ID_INIT);
-
TrexPlatformApi::driver_stat_cap_e rule_type = (TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type;
if ((m_cap & rule_type) == 0) {
@@ -893,28 +898,36 @@ void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) {
}
// return false if no counters changed since last run. true otherwise
-bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) {
+// s_json - flow statistics json
+// l_json - latency data json
+// baseline - If true, send flow statistics fields even if they were not changed since last run
+bool CFlowStatRuleMgr::dump_json(std::string & s_json, std::string & l_json, bool baseline) {
rx_per_flow_t rx_stats[MAX_FLOW_STATS];
rx_per_flow_t rx_stats_payload[MAX_FLOW_STATS];
tx_per_flow_t tx_stats[MAX_FLOW_STATS];
tx_per_flow_t tx_stats_payload[MAX_FLOW_STATS_PAYLOAD];
rfc2544_info_t rfc2544_info[MAX_FLOW_STATS_PAYLOAD];
Json::FastWriter writer;
- Json::Value root;
+ Json::Value s_root;
+ Json::Value l_root;
- root["name"] = "flow_stats";
- root["type"] = 0;
+ s_root["name"] = "flow_stats";
+ s_root["type"] = 0;
+ l_root["name"] = "latency_stats";
+ l_root["type"] = 0;
if (baseline) {
- root["baseline"] = true;
+ s_root["baseline"] = true;
}
- Json::Value &data_section = root["data"];
- data_section["ts"]["value"] = Json::Value::UInt64(os_get_hr_tick_64());
- data_section["ts"]["freq"] = Json::Value::UInt64(os_get_hr_freq());
+ Json::Value &s_data_section = s_root["data"];
+ Json::Value &l_data_section = l_root["data"];
+ s_data_section["ts"]["value"] = Json::Value::UInt64(os_get_hr_tick_64());
+ s_data_section["ts"]["freq"] = Json::Value::UInt64(os_get_hr_freq());
if (m_user_id_map.is_empty()) {
- json = writer.write(root);
+ s_json = writer.write(s_root);
+ l_json = writer.write(l_root);
return true;
}
@@ -992,7 +1005,7 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) {
uint32_t user_id = it->first;
std::string str_user_id = static_cast<std::ostringstream*>( &(std::ostringstream() << user_id) )->str();
if (! user_id_info->was_sent()) {
- data_section[str_user_id]["first_time"] = true;
+ s_data_section[str_user_id]["first_time"] = true;
user_id_info->set_was_sent(true);
send_empty = false;
}
@@ -1000,18 +1013,22 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) {
std::string str_port = static_cast<std::ostringstream*>( &(std::ostringstream() << int(port) ) )->str();
if (user_id_info->need_to_send_rx(port) || baseline) {
user_id_info->set_no_need_to_send_rx(port);
- data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_cntr(port).get_pkts());
+ s_data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_cntr(port).get_pkts());
if (m_cap & TrexPlatformApi::IF_STAT_RX_BYTES_COUNT)
- //???put back data_section[str_user_id]["rx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_rx_cntr(port).get_bytes());
+ s_data_section[str_user_id]["rx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_rx_cntr(port).get_bytes());
send_empty = false;
}
if (user_id_info->need_to_send_tx(port) || baseline) {
user_id_info->set_no_need_to_send_tx(port);
- data_section[str_user_id]["tx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_tx_cntr(port).get_pkts());
- //??? pub back data_section[str_user_id]["tx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_tx_cntr(port).get_bytes());
+ s_data_section[str_user_id]["tx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_tx_cntr(port).get_pkts());
+ s_data_section[str_user_id]["tx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_tx_cntr(port).get_bytes());
send_empty = false;
}
}
+ if (send_empty) {
+ s_data_section[str_user_id] = Json::objectValue;
+ }
+
if (user_id_info->rfc2544_support()) {
CFlowStatUserIdInfoPayload *user_id_info_p = (CFlowStatUserIdInfoPayload *)user_id_info;
// payload object. Send also latency, jitter...
@@ -1022,33 +1039,34 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) {
rfc2544_info[hw_id].get_latency_json(json);
user_id_info_p->set_seq_err_cnt(rfc2544_info[hw_id].get_seq_err_cnt());
user_id_info_p->set_ooo_cnt(rfc2544_info[hw_id].get_ooo_cnt());
- data_section[str_user_id]["rfc2544"][""] = json;
- data_section[str_user_id]["rfc2544"]["jitter"] = rfc2544_info[hw_id].get_jitter();
+ l_data_section[str_user_id]["latency"] = json;
+ l_data_section[str_user_id]["jitter"] = rfc2544_info[hw_id].get_jitter();
} else {
// Not mapped to hw_id. Get saved info.
user_id_info_p->get_latency_json(json);
- data_section[str_user_id]["rfc2544"]["latency"] = json;
- data_section[str_user_id]["rfc2544"]["jitter"] = user_id_info_p->get_jitter();
+ l_data_section[str_user_id]["latency"] = json;
+ l_data_section[str_user_id]["jitter"] = user_id_info_p->get_jitter();
}
- data_section[str_user_id]["rfc2544"]["err_cntrs"]["lost"]
+ ///????? add last 10 samples
+ l_data_section[str_user_id]["err_cntrs"]["lost"]
= Json::Value::UInt64(user_id_info_p->get_seq_err_cnt());
- data_section[str_user_id]["rfc2544"]["err_cntrs"]["out_of_order"]
+ l_data_section[str_user_id]["err_cntrs"]["out_of_order"]
= Json::Value::UInt64(user_id_info_p->get_ooo_cnt());
//??? temp - remove
- data_section[str_user_id]["tx_bytes"]["0"]
+#if 0
+ s_data_section[str_user_id]["tx_bytes"]["0"]
= Json::Value::UInt64(user_id_info_p->get_seq_err_cnt());
- data_section[str_user_id]["tx_bytes"]["1"] = 0;
- data_section[str_user_id]["rx_bytes"]["0"]
+ s_data_section[str_user_id]["tx_bytes"]["1"] = 0;
+ s_data_section[str_user_id]["rx_bytes"]["0"]
= Json::Value::UInt64(user_id_info_p->get_ooo_cnt());
- data_section[str_user_id]["rx_bytes"]["1"] = 0;
- }
- if (send_empty) {
- data_section[str_user_id] = Json::objectValue;
+ s_data_section[str_user_id]["rx_bytes"]["1"] = 0;
+#endif
}
}
- json = writer.write(root);
+ s_json = writer.write(s_root);
+ l_json = writer.write(l_root);
// We always want to publish, even only the timestamp.
return true;
}
diff --git a/src/flow_stat.h b/src/flow_stat.h
index 36a4bad1..5bfab44a 100644
--- a/src/flow_stat.h
+++ b/src/flow_stat.h
@@ -43,10 +43,10 @@ typedef std::map<uint32_t, uint16_t>::iterator flow_stat_map_it_t;
class CRxCoreStateless;
struct flow_stat_payload_header {
- uint64_t time_stamp;
- uint16_t hw_id;
uint16_t magic;
+ uint16_t hw_id;
uint32_t seq;
+ uint64_t time_stamp;
};
@@ -246,7 +246,8 @@ class CFlowStatUserIdInfo {
bool rfc2544_support() {return m_rfc2544_support;}
protected:
- bool m_rfc2544_support;
+ bool m_rfc2544_support;
+ uint16_t m_hw_id; // Associated hw id. UINT16_MAX if no associated hw id.
private:
bool m_rx_changed[TREX_MAX_PORTS]; // Which RX counters changed since we last published
@@ -257,7 +258,6 @@ class CFlowStatUserIdInfo {
tx_per_flow_t m_tx_cntr[TREX_MAX_PORTS]; // How many packets transmitted with this user id since stream start
// How many packets transmitted with this user id, since stream creation, before stream start.
tx_per_flow_t m_tx_cntr_base[TREX_MAX_PORTS];
- uint16_t m_hw_id; // Associated hw id. UINT16_MAX if no associated hw id.
uint8_t m_proto; // protocol (UDP, TCP, other), associated with this user id.
uint8_t m_ref_count; // How many streams with this user id exists
uint8_t m_trans_ref_count; // How many streams with this user id currently transmit
@@ -309,13 +309,7 @@ class CFlowStatUserIdInfoPayload : public CFlowStatUserIdInfo {
return m_rfc2544_info.m_out_of_order + m_out_of_order_base;
}
- inline void reset_hw_id() {
- m_seq_error_base += m_rfc2544_info.m_seq_error;
- m_out_of_order_base += m_rfc2544_info.m_out_of_order;
- m_rfc2544_info.m_seq_error = 0;
- m_rfc2544_info.m_out_of_order = 0;
- }
-
+ inline void reset_hw_id();
private:
rfc2544_info_t m_rfc2544_info;
uint64_t m_seq_error_base;
@@ -377,7 +371,7 @@ class CFlowStatRuleMgr {
int start_stream(TrexStream * stream);
int stop_stream(TrexStream * stream);
int get_active_pgids(flow_stat_active_t &result);
- bool dump_json(std::string & json, bool baseline);
+ bool dump_json(std::string & s_json, std::string & l_json, bool baseline);
private:
void create();
diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp
index 18a09a2c..d5e36944 100644
--- a/src/gtest/trex_stateless_gtest.cpp
+++ b/src/gtest/trex_stateless_gtest.cpp
@@ -3729,6 +3729,7 @@ TEST_F(flow_stat, add_del_stream) {
TrexStream stream(TrexStream::stSINGLE_BURST, 0, 0);
TrexStream stream2(TrexStream::stSINGLE_BURST, 0, 0);
+ TrexStream stream3(TrexStream::stSINGLE_BURST, 0, 0);
stream.m_rx_check.m_enabled = true;
@@ -3762,17 +3763,29 @@ TEST_F(flow_stat, add_del_stream) {
test_pkt[27] = IPPROTO_UDP;
int ret = rule_mgr.add_stream(&stream);
assert (ret == 0);
+
+ stream3.m_rx_check.m_enabled = true;
+ stream3.m_rx_check.m_rule_type = TrexPlatformApi::IF_STAT_PAYLOAD;
+ stream3.m_rx_check.m_pg_id = 5; // same as first stream
+ stream3.m_pkt.binary = (uint8_t *)test_pkt;
+ stream3.m_pkt.len = sizeof(test_pkt);
+ try {
+ ret = rule_mgr.add_stream(&stream3);
+ } catch (TrexFStatEx e) {
+ assert(e.type() == TrexException::T_FLOW_STAT_DUP_PG_ID);
+ }
+
ret = rule_mgr.del_stream(&stream);
assert (ret == 0);
+ stream2.m_rx_check.m_enabled = true;
stream2.m_rx_check.m_rule_type = TrexPlatformApi::IF_STAT_IPV4_ID;
- stream2.m_rx_check.m_pg_id = 5; // ??? same as first stream
+ stream2.m_rx_check.m_pg_id = 5; // same as first stream
stream2.m_pkt.binary = (uint8_t *)test_pkt;
stream2.m_pkt.len = sizeof(test_pkt);
ret = rule_mgr.add_stream(&stream2);
assert (ret == 0);
-
ret = rule_mgr.del_stream(&stream2);
assert (ret == 0);
try {
@@ -3781,9 +3794,10 @@ TEST_F(flow_stat, add_del_stream) {
assert(e.type() == TrexException::T_FLOW_STAT_DEL_NON_EXIST);
}
- // do not want the constructor to try to free it
+ // do not want the destructor to try to free it
stream.m_pkt.binary = NULL;
stream2.m_pkt.binary = NULL;
+ stream3.m_pkt.binary = NULL;
}
TEST_F(flow_stat, start_stop_stream) {
diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h
index f60d1591..a52f9e60 100644
--- a/src/internal_api/trex_platform_api.h
+++ b/src/internal_api/trex_platform_api.h
@@ -234,6 +234,8 @@ public:
virtual void publish_async_data_now(uint32_t key, bool baseline) const {
}
+ int get_flow_stats(uint8_t port_id, void *stats, void *tx_stats, int min, int max, bool reset
+ , TrexPlatformApi::driver_stat_cap_e type) const {return 0;};
virtual int get_rfc2544_info(void *rfc2544_info, int min, int max, bool reset) const {return 0;};
virtual int reset_hw_flow_stats(uint8_t port_id) const {return 0;};
virtual void get_port_num(uint8_t &port_num) const {port_num = 2;};
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 8b33ead8..ccd1aa8d 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -1784,7 +1784,8 @@ protected:
class CCoreEthIFStateless : public CCoreEthIF {
public:
- virtual int send_node_flow_stat(CGenNodeStateless * node_sl);
+ virtual int send_node_flow_stat(rte_mbuf *m, CGenNodeStateless * node_sl, CCorePerPort * lp_port
+ , CVirtualIFPerSideStats * lp_stats);
virtual int send_node(CGenNode * node);
protected:
int handle_slow_path_node(CGenNode *node);
@@ -2005,9 +2006,89 @@ void CCoreEthIF::update_mac_addr(CGenNode * node,uint8_t *p){
}
}
-/// ??? need better implementation. Maybe implement as template of send_node.
+int CCoreEthIFStateless::send_node_flow_stat(rte_mbuf *m, CGenNodeStateless * node_sl, CCorePerPort * lp_port
+ , CVirtualIFPerSideStats * lp_stats) {
+ //??? remove
+# if 0
+ static int temp=1;
+ temp++;
+#endif
+
+ uint16_t hw_id = node_sl->get_stat_hw_id();
+ rte_mbuf *m_lat, *mi;
+
+ if (hw_id >= MAX_FLOW_STATS) {
+ // payload rule hw_ids are in the range right above ip id rules
+ uint16_t hw_id_payload = hw_id - MAX_FLOW_STATS;
+ if (hw_id_payload > max_stat_hw_id_seen_payload) {
+ max_stat_hw_id_seen_payload = hw_id_payload;
+ }
+ // alloc mbuf just for the latency header
+ m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), sizeof(struct flow_stat_payload_header));
+ if ( unlikely(m_lat == 0)) {
+ return -1;
+ }
+ char *p = rte_pktmbuf_append(m_lat, sizeof(struct flow_stat_payload_header));
+ struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *)p;
+ fsp_head->seq = lp_stats->m_seq_num[hw_id_payload];
+ fsp_head->time_stamp = os_get_hr_tick_64();
+ // ??? maybe following two lines can be done offline
+ fsp_head->hw_id = hw_id_payload;
+ fsp_head->magic = FLOW_STAT_PAYLOAD_MAGIC;
+
+ lp_stats->m_seq_num[hw_id_payload]++;
+
+ //??? remove
+#if 0
+ if (temp % 10 == 0) {
+ fsp_head->seq = lp_stats->m_seq_num[hw_id_payload]++;
+ }
+
+ if ((temp - 1) % 100 == 0) {
+ fsp_head->seq = lp_stats->m_seq_num[hw_id_payload] - 4;
+ // lp_stats->m_seq_num[hw_id_payload]--;
+ }
+#endif
+
+
+
+ if (rte_pktmbuf_is_contiguous(m)) {
+ // We have only the const mbuf
+ mi = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
+ assert(mi);
+ rte_pktmbuf_attach(mi, m);
+ rte_pktmbuf_trim(mi, sizeof(struct flow_stat_payload_header));
+ utl_rte_pktmbuf_add_after2(mi, m_lat);
+ } else {
+ // Field engine (vm) case.
+ rte_pktmbuf_trim(m, sizeof(struct flow_stat_payload_header));
+ utl_rte_pktmbuf_add_last(m, m_lat);
+ mi = m;
+ }
+
+ } else {
+ // ip id rule
+ if (hw_id > max_stat_hw_id_seen) {
+ max_stat_hw_id_seen = hw_id;
+ }
+ mi = m;
+ }
+ tx_per_flow_t *lp_s = &lp_stats->m_tx_per_flow[hw_id];
+ lp_s->add_pkts(1);
+ lp_s->add_bytes(mi->pkt_len);
+
+ send_pkt(lp_port, mi, lp_stats);
+ return 0;
+}
+
+#if 0
+//??? remove
// Maybe make it part of send_node somehow
int CCoreEthIFStateless::send_node_flow_stat(CGenNodeStateless * node_sl) {
+ //??? remove
+ static int temp=1;
+ temp++;
+
uint16_t hw_id = node_sl->get_stat_hw_id();
tx_per_flow_t *lp_s;
/* check that we have mbuf */
@@ -2041,14 +2122,14 @@ int CCoreEthIFStateless::send_node_flow_stat(CGenNodeStateless * node_sl) {
fsp_head->time_stamp = os_get_hr_tick_64();
lp_stats->m_seq_num[hw_id_payload]++;
// remove ???
-#if 0
- if (temp % 11 == 0) {
+
+ if (temp % 10 == 0) {
fsp_head->seq = lp_stats->m_seq_num[hw_id_payload]++;
}
-
- if ((temp -1) % 100 == 0) {
- fsp_head->seq = lp_stats->m_seq_num[hw_id_payload] - 3;
- lp_stats->m_seq_num[hw_id_payload]--;
+#if 1
+ if ((temp - 1) % 100 == 0) {
+ fsp_head->seq = lp_stats->m_seq_num[hw_id_payload] - 4;
+ // lp_stats->m_seq_num[hw_id_payload]--;
}
#endif
} else {
@@ -2064,9 +2145,9 @@ int CCoreEthIFStateless::send_node_flow_stat(CGenNodeStateless * node_sl) {
send_pkt(lp_port,m,lp_stats);
return 0;
}
+#endif
int CCoreEthIFStateless::send_node(CGenNode * no) {
-
/* if a node is marked as slow path - single IF to redirect it to slow path */
if (no->get_is_slow_path()) {
return handle_slow_path_node(no);
@@ -2074,10 +2155,6 @@ int CCoreEthIFStateless::send_node(CGenNode * no) {
CGenNodeStateless * node_sl=(CGenNodeStateless *) no;
- if (unlikely(node_sl->is_stat_needed())) {
- return send_node_flow_stat(node_sl);
- }
-
/* check that we have mbuf */
rte_mbuf_t * m;
@@ -2098,7 +2175,11 @@ int CCoreEthIFStateless::send_node(CGenNode * no) {
}
}
- send_pkt(lp_port,m,lp_stats);
+ if (unlikely(node_sl->is_stat_needed())) {
+ return send_node_flow_stat(m, node_sl, lp_port, lp_stats);
+ } else {
+ send_pkt(lp_port,m,lp_stats);
+ }
return (0);
};
@@ -3802,8 +3883,12 @@ CGlobalTRex::publish_async_data(bool sync_now, bool baseline) {
m_zmq_publisher.publish_json(json);
if (get_is_stateless()) {
- if (m_trex_stateless->m_rx_flow_stat.dump_json(json, baseline))
- m_zmq_publisher.publish_json(json);
+ std::string stat_json;
+ std::string latency_json;
+ if (m_trex_stateless->m_rx_flow_stat.dump_json(stat_json, latency_json, baseline)) {
+ m_zmq_publisher.publish_json(stat_json);
+ m_zmq_publisher.publish_json(latency_json);
+ }
}
}
diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
index d69b7d7f..736f3d02 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
@@ -115,8 +115,12 @@ TrexRpcCmdAddStream::_run(const Json::Value &params, Json::Value &result) {
}
stream->m_rx_check.m_pg_id = parse_int(rx, "stream_id", result);
- stream->m_rx_check.m_seq_enabled = parse_bool(rx, "seq_enabled", result);
- stream->m_rx_check.m_latency = parse_bool(rx, "latency_enabled", result);
+ std::string type = parse_string(rx, "rule_type", result);
+ if (type == "latency") {
+ stream->m_rx_check.m_rule_type = TrexPlatformApi::IF_STAT_PAYLOAD;
+ } else {
+ stream->m_rx_check.m_rule_type = TrexPlatformApi::IF_STAT_IPV4_ID;
+ }
}
/* make sure this is a valid stream to add */
diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp
index 9021ebf4..6959476c 100644
--- a/src/stateless/cp/trex_stream.cpp
+++ b/src/stateless/cp/trex_stream.cpp
@@ -144,7 +144,6 @@ TrexStream::TrexStream(uint8_t type,
m_expected_pkt_len = 0;
m_rx_check.m_enabled = false;
- m_rx_check.m_rule_type = TrexPlatformApi::IF_STAT_PAYLOAD; // default for now. Should come from user???
m_burst_total_pkts=0;
m_num_bursts=1;
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 9c05c16b..8bb89ee9 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -212,8 +212,7 @@ std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state)
}
rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m) {
- //?????????
- // temp implementation. Just copy the entire mbuf
+ //????????? temp implementation. Just copy the entire mbuf
rte_mbuf_t *m_new = CGlobalInfo::pktmbuf_alloc( get_socket_id(), m->data_len );
/* TBD remove this, should handle cases of error */
assert(m_new);
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index 1a5d9a7e..d624f455 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -30,7 +30,9 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) {
m_cpu_cp_u.Create(&m_cpu_dp_u);
for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
- m_per_flow_seq[i] = 0;
+ // This is the seq num value we expect next packet to have.
+ // Init value should match m_seq_num in CVirtualIFPerSideStats
+ m_per_flow_seq[i] = UINT32_MAX - 1; // catch wrap around issues early
m_per_flow_hist[i].Reset();
m_per_flow_jitter[i].reset();
m_per_flow_seq_error[i] = 0;
@@ -133,7 +135,7 @@ void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t *
if (is_flow_stat_id(ip_id)) {
uint16_t hw_id;
if (is_flow_stat_payload_id(ip_id)) {
- uint32_t seq; //??? handle seq wrap around
+ uint32_t seq;
uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*);
struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *)
(p + m->pkt_len - sizeof(struct flow_stat_payload_header));
@@ -142,20 +144,34 @@ void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t *
seq = fsp_head->seq;
if (unlikely(seq != m_per_flow_seq[hw_id])) {
if (seq < m_per_flow_seq[hw_id]) {
- if (seq == (m_per_flow_seq[hw_id] - 1)) {
- m_per_flow_dup[hw_id] += 1;
- printf("dup packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]);
+ if (m_per_flow_seq[hw_id] - seq > 100000) {
+ // packet loss while we had wrap around
+ m_per_flow_seq_error[hw_id] += seq - m_per_flow_seq[hw_id];
+ m_per_flow_seq[hw_id] = seq + 1;
} else {
- m_per_flow_out_of_order[hw_id] += 1;
- // We thought it was lost, but it was just out of order
- m_per_flow_seq_error[hw_id] -= 1;
- printf("ooo packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]);
+ if (seq == (m_per_flow_seq[hw_id] - 1)) {
+ m_per_flow_dup[hw_id] += 1;
+ } else {
+ m_per_flow_out_of_order[hw_id] += 1;
+ // We thought it was lost, but it was just out of order
+ m_per_flow_seq_error[hw_id] -= 1;
+ }
}
} else {
- // seq > m_per_flow_seq[hw_id]
- printf("lost packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]);
- m_per_flow_seq_error[hw_id] += seq - m_per_flow_seq[hw_id];
- m_per_flow_seq[hw_id] = seq + 1;
+ if (unlikely (m_per_flow_seq[hw_id] - seq > 100000)) {
+ // packet reorder while we had wrap around
+ if (seq == (m_per_flow_seq[hw_id] - 1)) {
+ m_per_flow_dup[hw_id] += 1;
+ } else {
+ m_per_flow_out_of_order[hw_id] += 1;
+ // We thought it was lost, but it was just out of order
+ m_per_flow_seq_error[hw_id] -= 1;
+ }
+ } else {
+ // seq > m_per_flow_seq[hw_id]. Assuming lost packets
+ m_per_flow_seq_error[hw_id] += seq - m_per_flow_seq[hw_id];
+ m_per_flow_seq[hw_id] = seq + 1;
+ }
}
} else {
m_per_flow_seq[hw_id] = seq + 1;
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index d946f920..2ebd209a 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -101,7 +101,7 @@ class CRxCoreStateless {
CCpuUtlCp m_cpu_cp_u;
// Used for acking "work" (go out of idle) messages from cp
volatile bool m_ack_start_work_msg __rte_cache_aligned;
- uint64_t m_per_flow_seq[MAX_FLOW_STATS_PAYLOAD]; // expected next seq num
+ uint32_t m_per_flow_seq[MAX_FLOW_STATS_PAYLOAD]; // expected next seq num
CTimeHistogram m_per_flow_hist[MAX_FLOW_STATS_PAYLOAD]; /* latency info */
CJitter m_per_flow_jitter[MAX_FLOW_STATS_PAYLOAD];
uint64_t m_per_flow_seq_error[MAX_FLOW_STATS_PAYLOAD]; // How many packet seq num gaps we saw (packets lost or out of order)