From a53f6be0617721b535086298095ad49057a7be69 Mon Sep 17 00:00:00 2001 From: Ido Barnea Date: Sun, 24 Apr 2016 14:43:28 +0300 Subject: Working version. temporary send_node that duplicates the mbuf data --- src/stateless/rx/trex_stateless_rx_core.cpp | 110 +++++++++++++++++++++++++--- 1 file changed, 99 insertions(+), 11 deletions(-) (limited to 'src/stateless/rx/trex_stateless_rx_core.cpp') diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index f7658e53..1a5d9a7e 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -6,6 +6,13 @@ #include "trex_stateless_messaging.h" #include "trex_stateless_rx_core.h" +void CCPortLatencyStl::reset() { + for (int i = 0; i < MAX_FLOW_STATS; i++) { + m_rx_pg_stat[i].clear(); + m_rx_pg_stat_payload[i].clear(); + } +} + void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_max_ports = cfg.m_max_ports; @@ -16,10 +23,20 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_state = STATE_IDLE; for (int i = 0; i < m_max_ports; i++) { - CLatencyManagerPerPort * lp = &m_ports[i]; + CLatencyManagerPerPortStl * lp = &m_ports[i]; lp->m_io = cfg.m_ports[i]; + lp->m_port.reset(); } m_cpu_cp_u.Create(&m_cpu_dp_u); + + for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) { + m_per_flow_seq[i] = 0; + m_per_flow_hist[i].Reset(); + m_per_flow_jitter[i].reset(); + m_per_flow_seq_error[i] = 0; + m_per_flow_out_of_order[i] = 0; + m_per_flow_dup[i] = 0; + } } void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) { @@ -107,16 +124,54 @@ void CRxCoreStateless::start() { } } -void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPort *lp, rte_mbuf_t *m) { +void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t *m) { CFlowStatParser parser; if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) { uint16_t ip_id; if (parser.get_ip_id(ip_id) == 0) { if (is_flow_stat_id(ip_id)) { - uint16_t hw_id = get_hw_id(ip_id); - lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1); - lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len); + uint16_t hw_id; + if (is_flow_stat_payload_id(ip_id)) { + uint32_t seq; //??? handle seq wrap around + uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*); + struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *) + (p + m->pkt_len - sizeof(struct flow_stat_payload_header)); + if (fsp_head->magic == FLOW_STAT_PAYLOAD_MAGIC) { + hw_id = fsp_head->hw_id; + seq = fsp_head->seq; + if (unlikely(seq != m_per_flow_seq[hw_id])) { + if (seq < m_per_flow_seq[hw_id]) { + if (seq == (m_per_flow_seq[hw_id] - 1)) { + m_per_flow_dup[hw_id] += 1; + printf("dup packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]); + } else { + m_per_flow_out_of_order[hw_id] += 1; + // We thought it was lost, but it was just out of order + m_per_flow_seq_error[hw_id] -= 1; + printf("ooo packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]); + } + } else { + // seq > m_per_flow_seq[hw_id] + printf("lost packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]); + m_per_flow_seq_error[hw_id] += seq - m_per_flow_seq[hw_id]; + m_per_flow_seq[hw_id] = seq + 1; + } + } else { + m_per_flow_seq[hw_id] = seq + 1; + } + lp->m_port.m_rx_pg_stat_payload[hw_id].add_pkts(1); + lp->m_port.m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len); + uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp ); + dsec_t ctime = ptime_convert_hr_dsec(d); + m_per_flow_hist[hw_id].Add(ctime); + m_per_flow_jitter[hw_id].calc(ctime); + } + } else { + hw_id = get_hw_id(ip_id); + lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1); + lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len); + } } } } @@ -136,7 +191,7 @@ void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) { CGenNodeLatencyPktInfo * l_msg; uint8_t msg_type = msg->m_msg_type; uint8_t rx_port_index; - CLatencyManagerPerPort * lp; + CLatencyManagerPerPortStl * lp; switch (msg_type) { case CGenNodeMsgBase::LATENCY_PKT: @@ -176,7 +231,7 @@ void CRxCoreStateless::flush_rx() { rte_mbuf_t * rx_pkts[64]; int i, total_pkts = 0; for (i = 0; i < m_max_ports; i++) { - CLatencyManagerPerPort * lp = &m_ports[i]; + CLatencyManagerPerPortStl * lp = &m_ports[i]; rte_mbuf_t * m; /* try to read 64 packets clean up the queue */ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64); @@ -198,7 +253,7 @@ int CRxCoreStateless::try_rx() { rte_mbuf_t * rx_pkts[64]; int i, total_pkts = 0; for (i = 0; i < m_max_ports; i++) { - CLatencyManagerPerPort * lp = &m_ports[i]; + CLatencyManagerPerPortStl * lp = &m_ports[i]; rte_mbuf_t * m; /* try to read 64 packets clean up the queue */ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64); @@ -223,6 +278,11 @@ bool CRxCoreStateless::is_flow_stat_id(uint16_t id) { return false; } +bool CRxCoreStateless::is_flow_stat_payload_id(uint16_t id) { + if (id == FLOW_STAT_PAYLOAD_IP_ID) return true; + return false; +} + uint16_t CRxCoreStateless::get_hw_id(uint16_t id) { return (0x00ff & id); } @@ -233,11 +293,39 @@ void CRxCoreStateless::reset_rx_stats(uint8_t port_id) { } } -int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset) { +int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max + , bool reset, TrexPlatformApi::driver_stat_cap_e type) { + for (int hw_id = min; hw_id <= max; hw_id++) { + if (type == TrexPlatformApi::IF_STAT_PAYLOAD) { + rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id]; + } else { + rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id]; + } + if (reset) { + if (type == TrexPlatformApi::IF_STAT_PAYLOAD) { + m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id].clear(); + } else { + m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); + } + } + } + return 0; +} + +int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) { + std::string json; for (int hw_id = min; hw_id <= max; hw_id++) { - rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id]; + rfc2544_info[hw_id - min].set_err_cntrs(m_per_flow_seq_error[hw_id], m_per_flow_out_of_order[hw_id]); + rfc2544_info[hw_id - min].set_jitter(m_per_flow_jitter[hw_id].get_jitter()); + m_per_flow_hist[hw_id].update(); + m_per_flow_hist[hw_id].dump_json("", json); + rfc2544_info[hw_id - min].set_latency_json(json); + if (reset) { - m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); + m_per_flow_seq_error[hw_id] = 0; + m_per_flow_out_of_order[hw_id] = 0; + m_per_flow_hist[hw_id].Reset(); + m_per_flow_jitter[hw_id].reset(); } } return 0; -- cgit 1.2.3-korg