summaryrefslogtreecommitdiffstats
path: root/src/stateless
diff options
context:
space:
mode:
authorIdo Barnea <ibarnea@cisco.com>2016-04-24 14:43:28 +0300
committerIdo Barnea <ibarnea@cisco.com>2016-05-18 19:20:21 +0300
commita53f6be0617721b535086298095ad49057a7be69 (patch)
tree3ccd8de6108cc3d769d25447cb39640bd2b2940d /src/stateless
parentd3b66fddf9840272b367f42e26ce16198eeadaf6 (diff)
Working version. temporary send_node that duplicates the mbuf data
Diffstat (limited to 'src/stateless')
-rw-r--r--src/stateless/cp/trex_stream.h2
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp12
-rw-r--r--src/stateless/dp/trex_stream_node.h2
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp110
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h31
5 files changed, 140 insertions, 17 deletions
diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h
index a0daac9e..de45555a 100644
--- a/src/stateless/cp/trex_stream.h
+++ b/src/stateless/cp/trex_stream.h
@@ -551,7 +551,7 @@ public:
bool m_enabled;
bool m_seq_enabled;
bool m_latency;
- uint8_t m_rule_type;
+ uint16_t m_rule_type;
uint32_t m_pg_id;
uint16_t m_hw_id;
} m_rx_check;
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 833fb6e1..9c05c16b 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -211,6 +211,18 @@ std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state)
return(res);
}
+rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m) {
+ //?????????
+ // temp implementation. Just copy the entire mbuf
+ rte_mbuf_t *m_new = CGlobalInfo::pktmbuf_alloc( get_socket_id(), m->data_len );
+ /* TBD remove this, should handle cases of error */
+ assert(m_new);
+ char *p = rte_pktmbuf_mtod(m, char*);
+ char *p_new = rte_pktmbuf_append(m_new, m->data_len);
+ memcpy(p_new , p, m->data_len);
+
+ return m_new;
+}
rte_mbuf_t * CGenNodeStateless::alloc_node_with_vm(){
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index b5395e78..4edf3a06 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -390,7 +390,7 @@ public:
return (m_src_port);
}
-
+ rte_mbuf_t * alloc_flow_stat_mbuf(rte_mbuf_t *); //temp ???
rte_mbuf_t * alloc_node_with_vm();
void free_stl_node();
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index f7658e53..1a5d9a7e 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -6,6 +6,13 @@
#include "trex_stateless_messaging.h"
#include "trex_stateless_rx_core.h"
+void CCPortLatencyStl::reset() {
+ for (int i = 0; i < MAX_FLOW_STATS; i++) {
+ m_rx_pg_stat[i].clear();
+ m_rx_pg_stat_payload[i].clear();
+ }
+}
+
void CRxCoreStateless::create(const CRxSlCfg &cfg) {
m_max_ports = cfg.m_max_ports;
@@ -16,10 +23,20 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) {
m_state = STATE_IDLE;
for (int i = 0; i < m_max_ports; i++) {
- CLatencyManagerPerPort * lp = &m_ports[i];
+ CLatencyManagerPerPortStl * lp = &m_ports[i];
lp->m_io = cfg.m_ports[i];
+ lp->m_port.reset();
}
m_cpu_cp_u.Create(&m_cpu_dp_u);
+
+ for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
+ m_per_flow_seq[i] = 0;
+ m_per_flow_hist[i].Reset();
+ m_per_flow_jitter[i].reset();
+ m_per_flow_seq_error[i] = 0;
+ m_per_flow_out_of_order[i] = 0;
+ m_per_flow_dup[i] = 0;
+ }
}
void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) {
@@ -107,16 +124,54 @@ void CRxCoreStateless::start() {
}
}
-void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPort *lp, rte_mbuf_t *m) {
+void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t *m) {
CFlowStatParser parser;
if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
uint16_t ip_id;
if (parser.get_ip_id(ip_id) == 0) {
if (is_flow_stat_id(ip_id)) {
- uint16_t hw_id = get_hw_id(ip_id);
- lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1);
- lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len);
+ uint16_t hw_id;
+ if (is_flow_stat_payload_id(ip_id)) {
+ uint32_t seq; //??? handle seq wrap around
+ uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*);
+ struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *)
+ (p + m->pkt_len - sizeof(struct flow_stat_payload_header));
+ if (fsp_head->magic == FLOW_STAT_PAYLOAD_MAGIC) {
+ hw_id = fsp_head->hw_id;
+ seq = fsp_head->seq;
+ if (unlikely(seq != m_per_flow_seq[hw_id])) {
+ if (seq < m_per_flow_seq[hw_id]) {
+ if (seq == (m_per_flow_seq[hw_id] - 1)) {
+ m_per_flow_dup[hw_id] += 1;
+ printf("dup packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]);
+ } else {
+ m_per_flow_out_of_order[hw_id] += 1;
+ // We thought it was lost, but it was just out of order
+ m_per_flow_seq_error[hw_id] -= 1;
+ printf("ooo packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]);
+ }
+ } else {
+ // seq > m_per_flow_seq[hw_id]
+ printf("lost packets seq:%d %ld\n", seq, m_per_flow_seq[hw_id]);
+ m_per_flow_seq_error[hw_id] += seq - m_per_flow_seq[hw_id];
+ m_per_flow_seq[hw_id] = seq + 1;
+ }
+ } else {
+ m_per_flow_seq[hw_id] = seq + 1;
+ }
+ lp->m_port.m_rx_pg_stat_payload[hw_id].add_pkts(1);
+ lp->m_port.m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len);
+ uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp );
+ dsec_t ctime = ptime_convert_hr_dsec(d);
+ m_per_flow_hist[hw_id].Add(ctime);
+ m_per_flow_jitter[hw_id].calc(ctime);
+ }
+ } else {
+ hw_id = get_hw_id(ip_id);
+ lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1);
+ lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len);
+ }
}
}
}
@@ -136,7 +191,7 @@ void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) {
CGenNodeLatencyPktInfo * l_msg;
uint8_t msg_type = msg->m_msg_type;
uint8_t rx_port_index;
- CLatencyManagerPerPort * lp;
+ CLatencyManagerPerPortStl * lp;
switch (msg_type) {
case CGenNodeMsgBase::LATENCY_PKT:
@@ -176,7 +231,7 @@ void CRxCoreStateless::flush_rx() {
rte_mbuf_t * rx_pkts[64];
int i, total_pkts = 0;
for (i = 0; i < m_max_ports; i++) {
- CLatencyManagerPerPort * lp = &m_ports[i];
+ CLatencyManagerPerPortStl * lp = &m_ports[i];
rte_mbuf_t * m;
/* try to read 64 packets clean up the queue */
uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
@@ -198,7 +253,7 @@ int CRxCoreStateless::try_rx() {
rte_mbuf_t * rx_pkts[64];
int i, total_pkts = 0;
for (i = 0; i < m_max_ports; i++) {
- CLatencyManagerPerPort * lp = &m_ports[i];
+ CLatencyManagerPerPortStl * lp = &m_ports[i];
rte_mbuf_t * m;
/* try to read 64 packets clean up the queue */
uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
@@ -223,6 +278,11 @@ bool CRxCoreStateless::is_flow_stat_id(uint16_t id) {
return false;
}
+bool CRxCoreStateless::is_flow_stat_payload_id(uint16_t id) {
+ if (id == FLOW_STAT_PAYLOAD_IP_ID) return true;
+ return false;
+}
+
uint16_t CRxCoreStateless::get_hw_id(uint16_t id) {
return (0x00ff & id);
}
@@ -233,11 +293,39 @@ void CRxCoreStateless::reset_rx_stats(uint8_t port_id) {
}
}
-int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset) {
+int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max
+ , bool reset, TrexPlatformApi::driver_stat_cap_e type) {
+ for (int hw_id = min; hw_id <= max; hw_id++) {
+ if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
+ rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id];
+ } else {
+ rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id];
+ }
+ if (reset) {
+ if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
+ m_ports[port_id].m_port.m_rx_pg_stat_payload[hw_id].clear();
+ } else {
+ m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear();
+ }
+ }
+ }
+ return 0;
+}
+
+int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) {
+ std::string json;
for (int hw_id = min; hw_id <= max; hw_id++) {
- rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id];
+ rfc2544_info[hw_id - min].set_err_cntrs(m_per_flow_seq_error[hw_id], m_per_flow_out_of_order[hw_id]);
+ rfc2544_info[hw_id - min].set_jitter(m_per_flow_jitter[hw_id].get_jitter());
+ m_per_flow_hist[hw_id].update();
+ m_per_flow_hist[hw_id].dump_json("", json);
+ rfc2544_info[hw_id - min].set_latency_json(json);
+
if (reset) {
- m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear();
+ m_per_flow_seq_error[hw_id] = 0;
+ m_per_flow_out_of_order[hw_id] = 0;
+ m_per_flow_hist[hw_id].Reset();
+ m_per_flow_jitter[hw_id].reset();
}
}
return 0;
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index a372a578..d946f920 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -26,6 +26,21 @@
class TrexStatelessCpToRxMsgBase;
+class CCPortLatencyStl {
+ public:
+ void reset();
+
+ public:
+ rx_per_flow_t m_rx_pg_stat[MAX_FLOW_STATS];
+ rx_per_flow_t m_rx_pg_stat_payload[MAX_FLOW_STATS_PAYLOAD];
+};
+
+class CLatencyManagerPerPortStl {
+public:
+ CCPortLatencyStl m_port;
+ CPortLatencyHWBase * m_io;
+};
+
class CRxSlCfg {
public:
CRxSlCfg (){
@@ -50,7 +65,9 @@ class CRxCoreStateless {
void start();
void create(const CRxSlCfg &cfg);
void reset_rx_stats(uint8_t port_id);
- int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset);
+ int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset
+ , TrexPlatformApi::driver_stat_cap_e type);
+ int get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset);
void work() {m_state = STATE_WORKING;}
void idle() {m_state = STATE_IDLE;}
void quit() {m_state = STATE_QUIT;}
@@ -64,18 +81,19 @@ class CRxCoreStateless {
void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg);
bool periodic_check_for_cp_messages();
void idle_state_loop();
- void handle_rx_pkt(CLatencyManagerPerPort * lp, rte_mbuf_t * m);
+ void handle_rx_pkt(CLatencyManagerPerPortStl * lp, rte_mbuf_t * m);
void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
void flush_rx();
int try_rx();
void try_rx_queues();
bool is_flow_stat_id(uint16_t id);
+ bool is_flow_stat_payload_id(uint16_t id);
uint16_t get_hw_id(uint16_t id);
private:
uint32_t m_max_ports;
bool m_has_streams;
- CLatencyManagerPerPort m_ports[TREX_MAX_PORTS];
+ CLatencyManagerPerPortStl m_ports[TREX_MAX_PORTS];
state_e m_state;
CNodeRing *m_ring_from_cp;
CNodeRing *m_ring_to_cp;
@@ -83,6 +101,11 @@ class CRxCoreStateless {
CCpuUtlCp m_cpu_cp_u;
// Used for acking "work" (go out of idle) messages from cp
volatile bool m_ack_start_work_msg __rte_cache_aligned;
-
+ uint64_t m_per_flow_seq[MAX_FLOW_STATS_PAYLOAD]; // expected next seq num
+ CTimeHistogram m_per_flow_hist[MAX_FLOW_STATS_PAYLOAD]; /* latency info */
+ CJitter m_per_flow_jitter[MAX_FLOW_STATS_PAYLOAD];
+ uint64_t m_per_flow_seq_error[MAX_FLOW_STATS_PAYLOAD]; // How many packet seq num gaps we saw (packets lost or out of order)
+ uint64_t m_per_flow_out_of_order[MAX_FLOW_STATS_PAYLOAD]; // Packets we got with seq num lower than expected
+ uint64_t m_per_flow_dup[MAX_FLOW_STATS_PAYLOAD]; // Packets we got with same seq num
};
#endif