From a53f6be0617721b535086298095ad49057a7be69 Mon Sep 17 00:00:00 2001 From: Ido Barnea Date: Sun, 24 Apr 2016 14:43:28 +0300 Subject: Working version. temporary send_node that duplicates the mbuf data --- src/stateless/cp/trex_stream.h | 2 +- src/stateless/dp/trex_stateless_dp_core.cpp | 12 +++ src/stateless/dp/trex_stream_node.h | 2 +- src/stateless/rx/trex_stateless_rx_core.cpp | 110 +++++++++++++++++++++++++--- src/stateless/rx/trex_stateless_rx_core.h | 31 +++++++- 5 files changed, 140 insertions(+), 17 deletions(-) (limited to 'src/stateless') diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h index a0daac9e..de45555a 100644 --- a/src/stateless/cp/trex_stream.h +++ b/src/stateless/cp/trex_stream.h @@ -551,7 +551,7 @@ public: bool m_enabled; bool m_seq_enabled; bool m_latency; - uint8_t m_rule_type; + uint16_t m_rule_type; uint32_t m_pg_id; uint16_t m_hw_id; } m_rx_check; diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 833fb6e1..9c05c16b 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -211,6 +211,18 @@ std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state) return(res); } +rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m) { + //????????? + // temp implementation. Just copy the entire mbuf + rte_mbuf_t *m_new = CGlobalInfo::pktmbuf_alloc( get_socket_id(), m->data_len ); + /* TBD remove this, should handle cases of error */ + assert(m_new); + char *p = rte_pktmbuf_mtod(m, char*); + char *p_new = rte_pktmbuf_append(m_new, m->data_len); + memcpy(p_new , p, m->data_len); + + return m_new; +} rte_mbuf_t * CGenNodeStateless::alloc_node_with_vm(){ diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h index b5395e78..4edf3a06 100644 --- a/src/stateless/dp/trex_stream_node.h +++ b/src/stateless/dp/trex_stream_node.h @@ -390,7 +390,7 @@ public: return (m_src_port); } - + rte_mbuf_t * alloc_flow_stat_mbuf(rte_mbuf_t *); //temp ??? rte_mbuf_t * alloc_node_with_vm(); void free_stl_node(); diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index f7658e53..1a5d9a7e 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -6,6 +6,13 @@ #include "trex_stateless_messaging.h" #include "trex_stateless_rx_core.h" +void CCPortLatencyStl::reset() { + for (int i = 0; i < MAX_FLOW_STATS; i++) { + m_rx_pg_stat[i].clear(); + m_rx_pg_stat_payload[i].clear(); + } +} + void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_max_ports = cfg.m_max_ports; @@ -16,10 +23,20 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_state = STATE_IDLE; for (int i = 0; i < m_max_ports; i++) { - CLatencyManagerPerPort * lp = &m_ports[i]; + CLatencyManagerPerPortStl * lp = &m_ports[i]; lp->m_io = cfg.m_ports[i]; + lp->m_port.reset(); } m_cpu_cp_u.Create(&m_cpu_dp_u); + + for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) { + m_per_flow_seq[i] = 0; + m_per_flow_hist[i].Reset(); + m_per_flow_jitter[i].reset(); + m_per_flow_seq_error[i] = 0; + m_per_flow_out_of_order[i] = 0; + m_per_flow_dup[i] = 0; + } } void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) { @@ -107,16 +124,54 @@ void CRxCoreStateless::start() { } } -void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPort *lp, rte_mbuf_t *m) { +void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t *m) { CFlowStatParser parser; if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) { uint16_t ip_id; if (parser.get_ip_id(ip_id) == 0) { if (is_flow_stat_id(ip_id)) { - uint16_t hw_id = get_hw_id(ip_id); - lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1); - lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len); + uint16_t hw_id; + if (is_flow_stat_payload_id(ip_id)) { + uint32_t seq; //??? handle seq wrap around + uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*); + struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *) + (p + m->pkt_len - sizeof(struct flow_stat_payload_header)); + if (fsp_head->magic == FLOW_STAT_PAYLOAD_MAGIC) { + hw_id = fsp_head->hw_id; + seq = fsp_head->seq; + if (unlikely(seq != m_per_flow_seq[hw_id])) { + if (seq < m_per_flow_seq[hw_id]) { + if (seq == (m_per_flow_seq[hw_id] - 1)) { + m_per_flow_dup[hw_id] += 1; + printf("dup packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]); + } else { + m_per_flow_out_of_order[hw_id] += 1; + // We thought it was lost, but it was just out of order + m_per_flow_seq_error[hw_id] -= 1; + printf("ooo packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]); + } + } else { + // seq > m_per_flow_seq[hw_id] + printf("lost packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]); + m_per_flow_seq_error[hw_id] += seq - m_per_flow_seq[hw_id]; + m_per_flow_seq[hw_id] = seq + 1; + } + } else { + m_per_flow_seq[hw_id] = seq + 1; + } + lp->m_port.m_rx_pg_stat_payload[hw_id].add_pkts(1); + lp->m_port.m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len); + uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp ); + dsec_t ctime = ptime_convert_hr_dsec(d); + m_per_flow_hist[hw_id].Add(ctime); + m_per_flow_jitter[hw_id].calc(ctime); + } + } else { + hw_id = get_hw_id(ip_id); + lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1); + lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len); + } } } } @@ -136,7 +191,7 @@ void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) { CGenNodeLatencyPktInfo * l_msg; uint8_t msg_type = msg->m_msg_type; uint8_t rx_port_index; - CLatencyManagerPerPort * lp; + CLatencyManagerPerPortStl * lp; switch (msg_type) { case CGenNodeMsgBase::LATENCY_PKT: @@ -176,7 +231,7 @@ void CRxCoreStateless::flush_rx() { rte_mbuf_t * rx_pkts[64]; int i, total_pkts = 0; for (i = 0; i < m_max_ports; i++) { - CLatencyManagerPerPort * lp = &m_ports[i]; + CLatencyManagerPerPortStl * lp = &m_ports[i]; rte_mbuf_t * m; /* try to read 64 packets clean up the queue */ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64); @@ -198,7 +253,7 @@ int CRxCoreStateless::try_rx() { rte_mbuf_t * rx_pkts[64]; int i, total_pkts = 0; for (i = 0; i < m_max_ports; i++) { - CLatencyManagerPerPort * lp = &m_ports[i]; + CLatencyManagerPerPortStl * lp = &m_ports[i]; rte_mbuf_t * m; /* try to read 64 packets clean up the queue */ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64); @@ -223,6 +278,11 @@ bool CRxCoreStateless::is_flow_stat_id(uint16_t id) { return false; } +bool CRxCoreStateless::is_flow_stat_payload_id(uint16_t id) { + if (id == FLOW_STAT_PAYLOAD_IP_ID) return true; + return false; +} + uint16_t CRxCoreStateless::get_hw_id(uint16_t id) { return (0x00ff & id); } @@ -233,11 +293,39 @@ void CRxCoreStateless::reset_rx_stats(uint8_t port_id) { } } -int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset) { +int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max + , bool reset, TrexPlatformApi::driver_stat_cap_e type) { + for (int hw_id = min; hw_id <= max; hw_id++) { + if (type == TrexPlatformApi::IF_STAT_PAYLOAD) { + rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id]; + } else { + rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id]; + } + if (reset) { + if (type == TrexPlatformApi::IF_STAT_PAYLOAD) { + m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id].clear(); + } else { + m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); + } + } + } + return 0; +} + +int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) { + std::string json; for (int hw_id = min; hw_id <= max; hw_id++) { - rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id]; + rfc2544_info[hw_id - min].set_err_cntrs(m_per_flow_seq_error[hw_id], m_per_flow_out_of_order[hw_id]); + rfc2544_info[hw_id - min].set_jitter(m_per_flow_jitter[hw_id].get_jitter()); + m_per_flow_hist[hw_id].update(); + m_per_flow_hist[hw_id].dump_json("", json); + rfc2544_info[hw_id - min].set_latency_json(json); + if (reset) { - m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); + m_per_flow_seq_error[hw_id] = 0; + m_per_flow_out_of_order[hw_id] = 0; + m_per_flow_hist[hw_id].Reset(); + m_per_flow_jitter[hw_id].reset(); } } return 0; diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index a372a578..d946f920 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -26,6 +26,21 @@ class TrexStatelessCpToRxMsgBase; +class CCPortLatencyStl { + public: + void reset(); + + public: + rx_per_flow_t m_rx_pg_stat[MAX_FLOW_STATS]; + rx_per_flow_t m_rx_pg_stat_payload[MAX_FLOW_STATS_PAYLOAD]; +}; + +class CLatencyManagerPerPortStl { +public: + CCPortLatencyStl m_port; + CPortLatencyHWBase * m_io; +}; + class CRxSlCfg { public: CRxSlCfg (){ @@ -50,7 +65,9 @@ class CRxCoreStateless { void start(); void create(const CRxSlCfg &cfg); void reset_rx_stats(uint8_t port_id); - int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset); + int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset + , TrexPlatformApi::driver_stat_cap_e type); + int get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset); void work() {m_state = STATE_WORKING;} void idle() {m_state = STATE_IDLE;} void quit() {m_state = STATE_QUIT;} @@ -64,18 +81,19 @@ class CRxCoreStateless { void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); bool periodic_check_for_cp_messages(); void idle_state_loop(); - void handle_rx_pkt(CLatencyManagerPerPort * lp, rte_mbuf_t * m); + void handle_rx_pkt(CLatencyManagerPerPortStl * lp, rte_mbuf_t * m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); void flush_rx(); int try_rx(); void try_rx_queues(); bool is_flow_stat_id(uint16_t id); + bool is_flow_stat_payload_id(uint16_t id); uint16_t get_hw_id(uint16_t id); private: uint32_t m_max_ports; bool m_has_streams; - CLatencyManagerPerPort m_ports[TREX_MAX_PORTS]; + CLatencyManagerPerPortStl m_ports[TREX_MAX_PORTS]; state_e m_state; CNodeRing *m_ring_from_cp; CNodeRing *m_ring_to_cp; @@ -83,6 +101,11 @@ class CRxCoreStateless { CCpuUtlCp m_cpu_cp_u; // Used for acking "work" (go out of idle) messages from cp volatile bool m_ack_start_work_msg __rte_cache_aligned; - + uint64_t m_per_flow_seq[MAX_FLOW_STATS_PAYLOAD]; // expected next seq num + CTimeHistogram m_per_flow_hist[MAX_FLOW_STATS_PAYLOAD]; /* latency info */ + CJitter m_per_flow_jitter[MAX_FLOW_STATS_PAYLOAD]; + uint64_t m_per_flow_seq_error[MAX_FLOW_STATS_PAYLOAD]; // How many packet seq num gaps we saw (packets lost or out of order) + uint64_t m_per_flow_out_of_order[MAX_FLOW_STATS_PAYLOAD]; // Packets we got with seq num lower than expected + uint64_t m_per_flow_dup[MAX_FLOW_STATS_PAYLOAD]; // Packets we got with same seq num }; #endif -- cgit 1.2.3-korg