summaryrefslogtreecommitdiffstats
path: root/src/stateless/rx/trex_stateless_rx_core.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stateless/rx/trex_stateless_rx_core.cpp')
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp110
1 files changed, 99 insertions, 11 deletions
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index f7658e53..1a5d9a7e 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -6,6 +6,13 @@
#include "trex_stateless_messaging.h"
#include "trex_stateless_rx_core.h"
+void CCPortLatencyStl::reset() {
+ for (int i = 0; i < MAX_FLOW_STATS; i++) {
+ m_rx_pg_stat[i].clear();
+ m_rx_pg_stat_payload[i].clear();
+ }
+}
+
void CRxCoreStateless::create(const CRxSlCfg &cfg) {
m_max_ports = cfg.m_max_ports;
@@ -16,10 +23,20 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) {
m_state = STATE_IDLE;
for (int i = 0; i < m_max_ports; i++) {
- CLatencyManagerPerPort * lp = &m_ports[i];
+ CLatencyManagerPerPortStl * lp = &m_ports[i];
lp->m_io = cfg.m_ports[i];
+ lp->m_port.reset();
}
m_cpu_cp_u.Create(&m_cpu_dp_u);
+
+ for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
+ m_per_flow_seq[i] = 0;
+ m_per_flow_hist[i].Reset();
+ m_per_flow_jitter[i].reset();
+ m_per_flow_seq_error[i] = 0;
+ m_per_flow_out_of_order[i] = 0;
+ m_per_flow_dup[i] = 0;
+ }
}
void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) {
@@ -107,16 +124,54 @@ void CRxCoreStateless::start() {
}
}
-void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPort *lp, rte_mbuf_t *m) {
+void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t *m) {
CFlowStatParser parser;
if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
uint16_t ip_id;
if (parser.get_ip_id(ip_id) == 0) {
if (is_flow_stat_id(ip_id)) {
- uint16_t hw_id = get_hw_id(ip_id);
- lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1);
- lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len);
+ uint16_t hw_id;
+ if (is_flow_stat_payload_id(ip_id)) {
+ uint32_t seq; //??? handle seq wrap around
+ uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*);
+ struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *)
+ (p + m->pkt_len - sizeof(struct flow_stat_payload_header));
+ if (fsp_head->magic == FLOW_STAT_PAYLOAD_MAGIC) {
+ hw_id = fsp_head->hw_id;
+ seq = fsp_head->seq;
+ if (unlikely(seq != m_per_flow_seq[hw_id])) {
+ if (seq < m_per_flow_seq[hw_id]) {
+ if (seq == (m_per_flow_seq[hw_id] - 1)) {
+ m_per_flow_dup[hw_id] += 1;
+ printf("dup packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]);
+ } else {
+ m_per_flow_out_of_order[hw_id] += 1;
+ // We thought it was lost, but it was just out of order
+ m_per_flow_seq_error[hw_id] -= 1;
+ printf("ooo packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]);
+ }
+ } else {
+ // seq > m_per_flow_seq[hw_id]
+ printf("lost packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]);
+ m_per_flow_seq_error[hw_id] += seq - m_per_flow_seq[hw_id];
+ m_per_flow_seq[hw_id] = seq + 1;
+ }
+ } else {
+ m_per_flow_seq[hw_id] = seq + 1;
+ }
+ lp->m_port.m_rx_pg_stat_payload[hw_id].add_pkts(1);
+ lp->m_port.m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len);
+ uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp );
+ dsec_t ctime = ptime_convert_hr_dsec(d);
+ m_per_flow_hist[hw_id].Add(ctime);
+ m_per_flow_jitter[hw_id].calc(ctime);
+ }
+ } else {
+ hw_id = get_hw_id(ip_id);
+ lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1);
+ lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len);
+ }
}
}
}
@@ -136,7 +191,7 @@ void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) {
CGenNodeLatencyPktInfo * l_msg;
uint8_t msg_type = msg->m_msg_type;
uint8_t rx_port_index;
- CLatencyManagerPerPort * lp;
+ CLatencyManagerPerPortStl * lp;
switch (msg_type) {
case CGenNodeMsgBase::LATENCY_PKT:
@@ -176,7 +231,7 @@ void CRxCoreStateless::flush_rx() {
rte_mbuf_t * rx_pkts[64];
int i, total_pkts = 0;
for (i = 0; i < m_max_ports; i++) {
- CLatencyManagerPerPort * lp = &m_ports[i];
+ CLatencyManagerPerPortStl * lp = &m_ports[i];
rte_mbuf_t * m;
/* try to read 64 packets clean up the queue */
uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
@@ -198,7 +253,7 @@ int CRxCoreStateless::try_rx() {
rte_mbuf_t * rx_pkts[64];
int i, total_pkts = 0;
for (i = 0; i < m_max_ports; i++) {
- CLatencyManagerPerPort * lp = &m_ports[i];
+ CLatencyManagerPerPortStl * lp = &m_ports[i];
rte_mbuf_t * m;
/* try to read 64 packets clean up the queue */
uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
@@ -223,6 +278,11 @@ bool CRxCoreStateless::is_flow_stat_id(uint16_t id) {
return false;
}
+bool CRxCoreStateless::is_flow_stat_payload_id(uint16_t id) {
+ if (id == FLOW_STAT_PAYLOAD_IP_ID) return true;
+ return false;
+}
+
uint16_t CRxCoreStateless::get_hw_id(uint16_t id) {
return (0x00ff & id);
}
@@ -233,11 +293,39 @@ void CRxCoreStateless::reset_rx_stats(uint8_t port_id) {
}
}
-int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset) {
+int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max
+ , bool reset, TrexPlatformApi::driver_stat_cap_e type) {
+ for (int hw_id = min; hw_id <= max; hw_id++) {
+ if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
+ rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id];
+ } else {
+ rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id];
+ }
+ if (reset) {
+ if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
+ m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id].clear();
+ } else {
+ m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear();
+ }
+ }
+ }
+ return 0;
+}
+
+int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) {
+ std::string json;
for (int hw_id = min; hw_id <= max; hw_id++) {
- rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id];
+ rfc2544_info[hw_id - min].set_err_cntrs(m_per_flow_seq_error[hw_id], m_per_flow_out_of_order[hw_id]);
+ rfc2544_info[hw_id - min].set_jitter(m_per_flow_jitter[hw_id].get_jitter());
+ m_per_flow_hist[hw_id].update();
+ m_per_flow_hist[hw_id].dump_json("", json);
+ rfc2544_info[hw_id - min].set_latency_json(json);
+
if (reset) {
- m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear();
+ m_per_flow_seq_error[hw_id] = 0;
+ m_per_flow_out_of_order[hw_id] = 0;
+ m_per_flow_hist[hw_id].Reset();
+ m_per_flow_jitter[hw_id].reset();
}
}
return 0;