summaryrefslogtreecommitdiffstats
path: root/src/stateless
diff options
context:
space:
mode:
authoritraviv <itraviv@cisco.com>2016-07-31 11:56:41 +0300
committeritraviv <itraviv@cisco.com>2016-07-31 11:56:41 +0300
commit893d0feef9ba6fa3fb36c49f4b5bcad47cb2bf60 (patch)
tree689a09fa656f990672d2d62143dc173a46fe0316 /src/stateless
parentabf329075bd14f5f41c3753d560260ac809ec4f3 (diff)
parentdceb010b01e9f8a0e9c905370d39f149f01cab7e (diff)
Merge branch 'master' into scapy_server
Diffstat (limited to 'src/stateless')
-rw-r--r--src/stateless/cp/trex_stateless.cpp38
-rw-r--r--src/stateless/cp/trex_stateless.h6
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp81
-rw-r--r--src/stateless/cp/trex_stateless_port.h1
-rw-r--r--src/stateless/cp/trex_stream.cpp2
-rw-r--r--src/stateless/cp/trex_stream.h2
-rw-r--r--src/stateless/cp/trex_stream_vm.cpp72
-rw-r--r--src/stateless/cp/trex_stream_vm.h6
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp33
-rw-r--r--src/stateless/cp/trex_streams_compiler.h2
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp35
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h2
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp121
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h39
14 files changed, 343 insertions, 97 deletions
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index 698ede90..6d80539c 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -54,7 +54,11 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) {
m_publisher = cfg.m_publisher;
/* API core version */
- m_api_classes[APIClass::API_CLASS_TYPE_CORE].init(APIClass::API_CLASS_TYPE_CORE, 1, 2);
+ const int API_VER_MAJOR = 1;
+ const int API_VER_MINOR = 3;
+ m_api_classes[APIClass::API_CLASS_TYPE_CORE].init(APIClass::API_CLASS_TYPE_CORE,
+ API_VER_MAJOR,
+ API_VER_MINOR);
}
/**
@@ -64,6 +68,8 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) {
*/
TrexStateless::~TrexStateless() {
+ shutdown();
+
/* release memory for ports */
for (auto port : m_ports) {
delete port;
@@ -71,15 +77,33 @@ TrexStateless::~TrexStateless() {
m_ports.clear();
/* stops the RPC server */
- m_rpc_server->stop();
- delete m_rpc_server;
-
- m_rpc_server = NULL;
+ if (m_rpc_server) {
+ delete m_rpc_server;
+ m_rpc_server = NULL;
+ }
- delete m_platform_api;
- m_platform_api = NULL;
+ if (m_platform_api) {
+ delete m_platform_api;
+ m_platform_api = NULL;
+ }
}
+/**
+* shutdown the server
+*/
+void TrexStateless::shutdown() {
+
+ /* stop ports */
+ for (TrexStatelessPort *port : m_ports) {
+ /* safe to call stop even if not active */
+ port->stop_traffic();
+ }
+
+ /* shutdown the RPC server */
+ if (m_rpc_server) {
+ m_rpc_server->stop();
+ }
+}
/**
* starts the control plane side
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index 83ab6976..7ea669df 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -132,6 +132,11 @@ public:
/**
+ * shutdown the server
+ */
+ void shutdown();
+
+ /**
* fetch all the stats
*
*/
@@ -188,6 +193,7 @@ protected:
/* API */
APIClass m_api_classes[APIClass::API_CLASS_TYPE_MAX];
+
};
/**
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 4dc3e449..0fe4b410 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -87,6 +87,68 @@ protected:
}
};
+/*************************************
+ * Streams Feeder
+ * A class that holds a temporary
+ * clone of streams that can be
+ * manipulated
+ *
+ * this is a RAII object meant for
+ * graceful cleanup
+ ************************************/
+class StreamsFeeder {
+public:
+ StreamsFeeder(TrexStatelessPort *port) {
+
+ /* start pesimistic */
+ m_success = false;
+
+ /* fetch the original streams */
+ port->get_object_list(m_in_streams);
+
+ for (const TrexStream *in_stream : m_in_streams) {
+ TrexStream *out_stream = in_stream->clone(true);
+
+ get_stateless_obj()->m_rx_flow_stat.start_stream(out_stream);
+
+ m_out_streams.push_back(out_stream);
+ }
+ }
+
+ void set_status(bool status) {
+ m_success = status;
+ }
+
+ vector<TrexStream *> &get_streams() {
+ return m_out_streams;
+ }
+
+ /**
+ * RAII
+ */
+ ~StreamsFeeder() {
+ for (int i = 0; i < m_out_streams.size(); i++) {
+ TrexStream *out_stream = m_out_streams[i];
+ TrexStream *in_stream = m_in_streams[i];
+
+ if (m_success) {
+ /* success path */
+ get_stateless_obj()->m_rx_flow_stat.copy_state(out_stream, in_stream);
+ } else {
+ /* fail path */
+ get_stateless_obj()->m_rx_flow_stat.stop_stream(out_stream);
+ }
+ delete out_stream;
+ }
+ }
+
+private:
+ vector<TrexStream *> m_in_streams;
+ vector<TrexStream *> m_out_streams;
+ bool m_success;
+};
+
+
/***************************
* trex stateless port
*
@@ -193,10 +255,7 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
/* caclulate the effective factor for DP */
double factor = calculate_effective_factor(mul, force);
- /* fetch all the streams from the table */
- vector<TrexStream *> streams;
- get_object_list(streams);
-
+ StreamsFeeder feeder(this);
/* compiler it */
std::vector<TrexStreamsCompiledObj *> compiled_objs;
@@ -204,15 +263,19 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
TrexStreamsCompiler compiler;
bool rc = compiler.compile(m_port_id,
- streams,
+ feeder.get_streams(),
compiled_objs,
get_dp_core_count(),
factor,
&fail_msg);
+
if (!rc) {
+ feeder.set_status(false);
throw TrexException(fail_msg);
}
+ feeder.set_status(true);
+
/* generate a message to all the relevant DP cores to start transmitting */
assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
@@ -628,6 +691,9 @@ TrexStatelessPort::calculate_effective_factor_internal(const TrexPortMultiplier
case TrexPortMultiplier::MUL_BPS:
return m_graph_obj->get_factor_bps_l2(mul.m_value);
+ case TrexPortMultiplier::MUL_BPSL1:
+ return m_graph_obj->get_factor_bps_l1(mul.m_value);
+
case TrexPortMultiplier::MUL_PPS:
return m_graph_obj->get_factor_pps(mul.m_value);
@@ -678,7 +744,7 @@ TrexStatelessPort::delete_streams_graph() {
* port multiplier
*
**************************/
-const std::initializer_list<std::string> TrexPortMultiplier::g_types = {"raw", "bps", "pps", "percentage"};
+const std::initializer_list<std::string> TrexPortMultiplier::g_types = {"raw", "bps", "bpsl1", "pps", "percentage"};
const std::initializer_list<std::string> TrexPortMultiplier::g_ops = {"abs", "add", "sub"};
TrexPortMultiplier::
@@ -692,6 +758,9 @@ TrexPortMultiplier(const std::string &type_str, const std::string &op_str, doubl
} else if (type_str == "bps") {
type = MUL_BPS;
+ } else if (type_str == "bpsl1") {
+ type = MUL_BPSL1;
+
} else if (type_str == "pps") {
type = MUL_PPS;
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 8856b429..915c5325 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -477,6 +477,7 @@ public:
enum mul_type_e {
MUL_FACTOR,
MUL_BPS,
+ MUL_BPSL1,
MUL_PPS,
MUL_PERCENTAGE
};
diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp
index adc08ae2..7a3dfef7 100644
--- a/src/stateless/cp/trex_stream.cpp
+++ b/src/stateless/cp/trex_stream.cpp
@@ -141,7 +141,7 @@ TrexStream::TrexStream(uint8_t type,
m_pkt.binary = NULL;
m_pkt.len = 0;
- m_expected_pkt_len = 0;
+ m_expected_pkt_len = 0.0;
m_rx_check.m_enabled = false;
diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h
index 8ac5ebc8..0f59356d 100644
--- a/src/stateless/cp/trex_stream.h
+++ b/src/stateless/cp/trex_stream.h
@@ -558,7 +558,7 @@ public:
StreamVmDp *m_vm_dp;
CStreamPktData m_pkt;
- uint16_t m_expected_pkt_len;
+ double m_expected_pkt_len;
/* pkt */
diff --git a/src/stateless/cp/trex_stream_vm.cpp b/src/stateless/cp/trex_stream_vm.cpp
index 7a1dc122..9e4fbe1c 100644
--- a/src/stateless/cp/trex_stream_vm.cpp
+++ b/src/stateless/cp/trex_stream_vm.cpp
@@ -27,7 +27,64 @@ limitations under the License.
#include <common/Network/Packet/IPHeader.h>
#include <common/basic_utils.h>
+/**
+ * provides some tools for the fast rand function
+ * that is used by the datapath
+ * some features of this function is different
+ * from a regular random
+ * (such as average can be off by few percents)
+ *
+ * @author imarom (7/24/2016)
+ */
+class FastRandUtils {
+public:
+
+ /**
+ * searches the target in the cache
+ * if not found iterativly calculate it
+ * and add it to the cache
+ *
+ */
+ double calc_fastrand_avg(uint16_t target) {
+ auto search = m_avg_cache.find(target);
+ if (search != m_avg_cache.end()) {
+ return search->second;
+ }
+
+ /* not found - calculate it */
+ double avg = iterate_calc(target);
+
+ /* if there is enough space - to the cache */
+ if (m_avg_cache.size() <= G_MAX_CACHE_SIZE) {
+ m_avg_cache[target] = avg;
+ }
+
+ return avg;
+ }
+private:
+
+ /**
+ * hard calculate a value using iterations
+ *
+ */
+ double iterate_calc(uint16_t target) {
+ const int num_samples = 10000;
+ uint64_t tmp = 0;
+ uint32_t seed = 1;
+
+ for (int i = 0; i < num_samples; i++) {
+ tmp += fastrand(seed) % (target + 1);
+ }
+
+ return (tmp / double(num_samples));
+ }
+
+ std::unordered_map<uint16_t, double> m_avg_cache;
+ static const uint16_t G_MAX_CACHE_SIZE = 9230;
+};
+
+static FastRandUtils g_fastrand_util;
void StreamVmInstructionFixChecksumIpv4::Dump(FILE *fd){
@@ -350,9 +407,18 @@ void StreamVm::build_flow_var_table() {
var.m_ins.m_ins_flowv->m_min_value =60;
}
- m_expected_pkt_size = (var.m_ins.m_ins_flowv->m_min_value + var.m_ins.m_ins_flowv->m_max_value) / 2;
+ /* expected packet size calculation */
+
+ /* for random packet size - we need to find the average */
+ if (var.m_ins.m_ins_flowv->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_RANDOM) {
+ uint16_t range = var.m_ins.m_ins_flowv->m_max_value - var.m_ins.m_ins_flowv->m_min_value;
+ m_expected_pkt_size = var.m_ins.m_ins_flowv->m_min_value + g_fastrand_util.calc_fastrand_avg(range);
+ } else {
+ m_expected_pkt_size = (var.m_ins.m_ins_flowv->m_min_value + var.m_ins.m_ins_flowv->m_max_value) / 2.0;
+ }
+
}
- }/* for */
+ }
}
@@ -962,7 +1028,7 @@ StreamVm::~StreamVm() {
* calculate expected packet size of stream's VM
*
*/
-uint16_t
+double
StreamVm::calc_expected_pkt_size(uint16_t regular_pkt_size) const {
/* if no packet size change - simply return the regular packet size */
diff --git a/src/stateless/cp/trex_stream_vm.h b/src/stateless/cp/trex_stream_vm.h
index 58d43bd4..f55d33c1 100644
--- a/src/stateless/cp/trex_stream_vm.h
+++ b/src/stateless/cp/trex_stream_vm.h
@@ -1417,7 +1417,7 @@ public:
m_prefix_size=0;
m_bss=0;
m_pkt_size=0;
- m_expected_pkt_size=0;
+ m_expected_pkt_size=0.0;
m_cur_var_offset=0;
m_is_random_var=false;
@@ -1435,7 +1435,7 @@ public:
* if the VM changes the packet length (random)
*
*/
- uint16_t calc_expected_pkt_size(uint16_t regular_pkt_size) const;
+ double calc_expected_pkt_size(uint16_t regular_pkt_size) const;
@@ -1576,7 +1576,7 @@ private:
uint16_t m_prefix_size;
uint16_t m_pkt_size;
- uint16_t m_expected_pkt_size;
+ double m_expected_pkt_size;
uint16_t m_cur_var_offset;
uint16_t m_max_field_update; /* the location of the last byte that is going to be changed in the packet */
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
index f2296aeb..e54c5f9c 100644
--- a/src/stateless/cp/trex_streams_compiler.cpp
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -382,7 +382,13 @@ TrexStreamsCompiler::compile(uint8_t port_id,
assert(dp_core_count > 0);
try {
- return compile_internal(port_id,streams,objs,dp_core_count,factor,fail_msg);
+ return compile_internal(port_id,
+ streams,
+ objs,
+ dp_core_count,
+ factor,
+ fail_msg);
+
} catch (const TrexException &ex) {
if (fail_msg) {
*fail_msg = ex.what();
@@ -411,7 +417,6 @@ TrexStreamsCompiler::compile_internal(uint8_t por
GraphNodeMap nodes;
-
/* compile checks */
pre_compile_check(streams, nodes);
@@ -474,7 +479,7 @@ TrexStreamsCompiler::compile_on_single_core(uint8_t
}
/* compile all the streams */
- for (auto stream : streams) {
+ for (auto const stream : streams) {
/* skip non-enabled streams */
if (!stream->m_enabled) {
@@ -507,7 +512,7 @@ TrexStreamsCompiler::compile_on_all_cores(uint8_t
}
/* compile all the streams */
- for (auto stream : streams) {
+ for (auto const stream : streams) {
/* skip non-enabled streams */
if (!stream->m_enabled) {
@@ -527,7 +532,7 @@ TrexStreamsCompiler::compile_on_all_cores(uint8_t
*
*/
void
-TrexStreamsCompiler::compile_stream(TrexStream *stream,
+TrexStreamsCompiler::compile_stream(const TrexStream *stream,
double factor,
uint8_t dp_core_count,
std::vector<TrexStreamsCompiledObj *> &objs,
@@ -543,31 +548,25 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream,
new_next_id = nodes.get(stream->m_next_stream_id)->m_compressed_stream_id;
}
- TrexStream *fixed_rx_flow_stat_stream = stream->clone(true);
-
- get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream);
- // CFlowStatRuleMgr keeps state of the stream object. We duplicated the stream here (in order not
- // change the packet kept in the stream). We want the state to be saved in the original stream.
- get_stateless_obj()->m_rx_flow_stat.copy_state(fixed_rx_flow_stat_stream, stream);
-
- fixed_rx_flow_stat_stream->update_rate_factor(factor);
+ /* we clone because we alter the stream now */
+ std::unique_ptr<TrexStream> tmp_stream(stream->clone(true));
+ tmp_stream->update_rate_factor(factor);
/* can this stream be split to many cores ? */
if ( (dp_core_count == 1) || (!stream->is_splitable(dp_core_count)) ) {
- compile_stream_on_single_core(fixed_rx_flow_stat_stream,
+ compile_stream_on_single_core(tmp_stream.get(),
dp_core_count,
objs,
new_id,
new_next_id);
} else {
- compile_stream_on_all_cores(fixed_rx_flow_stat_stream,
+ compile_stream_on_all_cores(tmp_stream.get(),
dp_core_count,
objs,
new_id,
new_next_id);
}
- delete fixed_rx_flow_stat_stream;
}
/**
@@ -925,7 +924,7 @@ TrexStreamsGraphObj::find_max_rate() {
}
/* if not mark as inifite - get the last event time */
- if (m_expected_duration != -1) {
+ if ( (m_expected_duration != -1) && (m_rate_events.size() > 0) ) {
m_expected_duration = m_rate_events.back().time;
}
diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h
index 0ce71b49..171e3aff 100644
--- a/src/stateless/cp/trex_streams_compiler.h
+++ b/src/stateless/cp/trex_streams_compiler.h
@@ -141,7 +141,7 @@ private:
bool all_continues);
- void compile_stream(TrexStream *stream,
+ void compile_stream(const TrexStream *stream,
double factor,
uint8_t dp_core_count,
std::vector<TrexStreamsCompiledObj *> &objs,
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index fe78c5b2..58d8f21a 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -258,23 +258,20 @@ rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_
fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size);
return m;
} else {
- // r/w --> read only. Should do something like:
- // Alloc indirect,. make r/w->indirect point to read_only) -> new fsp_header
- // for the mean time, just copy the entire packet.
- m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_pkt_len(m) );
- assert(m_ret);
- char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_pkt_len(m));
- rte_mbuf_t *m_free = m;
- while (m != NULL) {
- char *p = rte_pktmbuf_mtod(m, char*);
- memcpy(p_new, p, m->data_len);
- p_new += m->data_len;
- m = m->next;
- }
- p_new = rte_pktmbuf_mtod(m_ret, char*);
- fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m_ret) - fsp_head_size);
- rte_pktmbuf_free(m_free);
- return m_ret;
+ // We have: r/w --> read only.
+ // Changing to:
+ // (original) r/w -> (new) indirect (direct is original read_only, after trimming last bytes) -> (new) latency info
+ rte_mbuf_t *m_read_only = m->next, *m_indirect;
+
+ m_indirect = CGlobalInfo::pktmbuf_alloc_small(get_socket_id());
+ assert(m_indirect);
+ // alloc mbuf just for the latency header
+ m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size);
+ assert(m_lat);
+ fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size);
+ utl_rte_pktmbuf_chain_with_indirect(m, m_indirect, m_read_only, m_lat);
+ m_indirect->data_len = (uint16_t)(m_indirect->data_len - fsp_head_size);
+ return m;
}
}
}
@@ -910,6 +907,10 @@ TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
uint8_t hw_id = stream->m_rx_check.m_hw_id;
assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD);
node->set_stat_hw_id(hw_id);
+ // no support for cache with flow stat payload rules
+ if ((TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) {
+ stream->m_cache_size = 0;
+ }
}
/* set socket id */
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index af2187ae..31cb0be3 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -114,7 +114,7 @@ class TrexStatelessDpCore {
public:
- #define SCHD_OFFSET_DTIME (10.0/1000000.0)
+ #define SCHD_OFFSET_DTIME (100.0/1000000.0)
/* states */
enum state_e {
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index b3555c13..853fc868 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -29,13 +29,21 @@
void CRFC2544Info::create() {
m_latency.Create();
- // This is the seq num value we expect next packet to have.
- // Init value should match m_seq_num in CVirtualIFPerSideStats
- m_seq = UINT32_MAX - 1; // catch wrap around issues early
+ m_exp_flow_seq = 0;
+ m_prev_flow_seq = 0;
reset();
}
+// after calling stop, packets still arriving will be considered error
+void CRFC2544Info::stop() {
+ m_prev_flow_seq = m_exp_flow_seq;
+ m_exp_flow_seq = FLOW_STAT_PAYLOAD_INITIAL_FLOW_SEQ;
+}
+
void CRFC2544Info::reset() {
+ // This is the seq num value we expect next packet to have.
+ // Init value should match m_seq_num in CVirtualIFPerSideStats
+ m_seq = UINT32_MAX - 1; // catch wrap around issues early
m_seq_err = 0;
m_seq_err_events_too_big = 0;
m_seq_err_events_too_low = 0;
@@ -72,9 +80,6 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) {
m_ring_to_cp = cp_rx->getRingDpToCp(0);
m_state = STATE_IDLE;
- m_watchdog_handle = -1;
- m_watchdog = NULL;
-
for (int i = 0; i < m_max_ports; i++) {
CLatencyManagerPerPortStl * lp = &m_ports[i];
lp->m_io = cfg.m_ports[i];
@@ -93,14 +98,14 @@ void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) {
}
void CRxCoreStateless::tickle() {
- m_watchdog->tickle(m_watchdog_handle);
+ m_monitor.tickle();
}
bool CRxCoreStateless::periodic_check_for_cp_messages() {
/* tickle the watchdog */
tickle();
-
+
/* fast path */
if ( likely ( m_ring_from_cp->isEmpty() ) ) {
return false;
@@ -147,14 +152,14 @@ void CRxCoreStateless::idle_state_loop() {
}
}
-void CRxCoreStateless::start(TrexWatchDog &watchdog) {
+void CRxCoreStateless::start() {
int count = 0;
int i = 0;
bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false;
/* register a watchdog handle on current core */
- m_watchdog = &watchdog;
- m_watchdog_handle = watchdog.register_monitor("STL RX CORE", 1);
+ m_monitor.create("STL RX CORE", 1);
+ TrexWatchDog::getInstance().register_monitor(&m_monitor);
while (true) {
if (m_state == STATE_WORKING) {
@@ -179,7 +184,7 @@ void CRxCoreStateless::start(TrexWatchDog &watchdog) {
}
rte_pause();
- m_watchdog->disable_monitor(m_watchdog_handle);
+ m_monitor.disable();
}
void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t *m) {
@@ -190,63 +195,96 @@ void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t *
if (parser.get_ip_id(ip_id) == 0) {
if (is_flow_stat_id(ip_id)) {
uint16_t hw_id;
+
if (is_flow_stat_payload_id(ip_id)) {
+ bool good_packet = true;
uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*);
struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *)
(p + m->pkt_len - sizeof(struct flow_stat_payload_header));
- if (fsp_head->magic == FLOW_STAT_PAYLOAD_MAGIC) {
- hw_id = fsp_head->hw_id;
- CRFC2544Info &curr_rfc2544 = m_rfc2544[hw_id];
+ hw_id = fsp_head->hw_id;
+ CRFC2544Info *curr_rfc2544;
+
+ if (unlikely(fsp_head->magic != FLOW_STAT_PAYLOAD_MAGIC) || hw_id >= MAX_FLOW_STATS_PAYLOAD) {
+ good_packet = false;
+ m_err_cntrs.m_bad_header++;
+ } else {
+ curr_rfc2544 = &m_rfc2544[hw_id];
+
+ if (fsp_head->flow_seq != curr_rfc2544->get_exp_flow_seq()) {
+ // bad flow seq num
+ // Might be the first packet of a new flow, packet from an old flow, or garbage.
+
+ if (fsp_head->flow_seq == curr_rfc2544->get_prev_flow_seq()) {
+ // packet from previous flow using this hw_id that arrived late
+ good_packet = false;
+ m_err_cntrs.m_old_flow++;
+ } else {
+ if (curr_rfc2544->no_flow_seq()) {
+ // first packet we see from this flow
+ good_packet = true;
+ curr_rfc2544->set_exp_flow_seq(fsp_head->flow_seq);
+ } else {
+ // garbage packet
+ good_packet = false;
+ m_err_cntrs.m_bad_header++;
+ }
+ }
+ }
+ }
+
+ if (good_packet) {
uint32_t pkt_seq = fsp_head->seq;
- uint32_t exp_seq = curr_rfc2544.get_seq();
+ uint32_t exp_seq = curr_rfc2544->get_seq();
if (unlikely(pkt_seq != exp_seq)) {
if (pkt_seq < exp_seq) {
if (exp_seq - pkt_seq > 100000) {
// packet loss while we had wrap around
- curr_rfc2544.inc_seq_err(pkt_seq - exp_seq);
- curr_rfc2544.inc_seq_err_too_big();
- curr_rfc2544.set_seq(pkt_seq + 1);
+ curr_rfc2544->inc_seq_err(pkt_seq - exp_seq);
+ curr_rfc2544->inc_seq_err_too_big();
+ curr_rfc2544->set_seq(pkt_seq + 1);
} else {
if (pkt_seq == (exp_seq - 1)) {
- curr_rfc2544.inc_dup();
+ curr_rfc2544->inc_dup();
} else {
- curr_rfc2544.inc_ooo();
+ curr_rfc2544->inc_ooo();
// We thought it was lost, but it was just out of order
- curr_rfc2544.dec_seq_err();
+ curr_rfc2544->dec_seq_err();
}
- curr_rfc2544.inc_seq_err_too_low();
+ curr_rfc2544->inc_seq_err_too_low();
}
} else {
if (unlikely (pkt_seq - exp_seq > 100000)) {
// packet reorder while we had wrap around
if (pkt_seq == (exp_seq - 1)) {
- curr_rfc2544.inc_dup();
+ curr_rfc2544->inc_dup();
} else {
- curr_rfc2544.inc_ooo();
+ curr_rfc2544->inc_ooo();
// We thought it was lost, but it was just out of order
- curr_rfc2544.dec_seq_err();
+ curr_rfc2544->dec_seq_err();
}
- curr_rfc2544.inc_seq_err_too_low();
+ curr_rfc2544->inc_seq_err_too_low();
} else {
- // seq > curr_rfc2544.seq. Assuming lost packets
- curr_rfc2544.inc_seq_err(pkt_seq - exp_seq);
- curr_rfc2544.inc_seq_err_too_big();
- curr_rfc2544.set_seq(pkt_seq + 1);
+ // seq > curr_rfc2544->seq. Assuming lost packets
+ curr_rfc2544->inc_seq_err(pkt_seq - exp_seq);
+ curr_rfc2544->inc_seq_err_too_big();
+ curr_rfc2544->set_seq(pkt_seq + 1);
}
}
} else {
- curr_rfc2544.set_seq(pkt_seq + 1);
+ curr_rfc2544->set_seq(pkt_seq + 1);
}
lp->m_port.m_rx_pg_stat_payload[hw_id].add_pkts(1);
- lp->m_port.m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len);
+ lp->m_port.m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC
uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp );
dsec_t ctime = ptime_convert_hr_dsec(d);
- curr_rfc2544.add_sample(ctime);
+ curr_rfc2544->add_sample(ctime);
}
} else {
hw_id = get_hw_id(ip_id);
- lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1);
- lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len);
+ if (hw_id < MAX_FLOW_STATS) {
+ lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1);
+ lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC
+ }
}
}
}
@@ -391,6 +429,12 @@ int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int
int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) {
for (int hw_id = min; hw_id <= max; hw_id++) {
CRFC2544Info &curr_rfc2544 = m_rfc2544[hw_id];
+
+ if (reset) {
+ // need to stop first, so count will be consistent
+ curr_rfc2544.stop();
+ }
+
curr_rfc2544.sample_period_end();
curr_rfc2544.export_data(rfc2544_info[hw_id - min]);
@@ -401,6 +445,11 @@ int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, in
return 0;
}
+int CRxCoreStateless::get_rx_err_cntrs(CRxCoreErrCntrs *rx_err) {
+ *rx_err = m_err_cntrs;
+ return 0;
+}
+
void CRxCoreStateless::set_working_msg_ack(bool val) {
sanb_smp_memory_barrier();
m_ack_start_work_msg = val;
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index ce1bc1ad..fc66704e 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -59,6 +59,7 @@ class CRxSlCfg {
class CRFC2544Info {
public:
void create();
+ void stop();
void reset();
void export_data(rfc2544_info_t_ &obj);
inline void add_sample(double stime) {
@@ -76,6 +77,10 @@ class CRFC2544Info {
inline void inc_seq_err_too_low() {m_seq_err_events_too_low++;}
inline void inc_dup() {m_dup++;}
inline void inc_ooo() {m_ooo++;}
+ inline uint16_t get_exp_flow_seq() {return m_exp_flow_seq;}
+ inline void set_exp_flow_seq(uint16_t flow_seq) {m_exp_flow_seq = flow_seq;}
+ inline uint16_t get_prev_flow_seq() {return m_prev_flow_seq;}
+ inline bool no_flow_seq() {return (m_exp_flow_seq == FLOW_STAT_PAYLOAD_INITIAL_FLOW_SEQ) ? true : false;}
private:
uint32_t m_seq; // expected next seq num
CTimeHistogram m_latency; // latency info
@@ -85,6 +90,28 @@ class CRFC2544Info {
uint64_t m_seq_err_events_too_low; // How many packet seq num lower than expected events we had
uint64_t m_ooo; // Packets we got with seq num lower than expected (We guess they are out of order)
uint64_t m_dup; // Packets we got with same seq num
+ uint16_t m_exp_flow_seq; // flow sequence number we should see in latency header
+ // flow sequence number previously used with this id. We use this to catch packets arriving late from an old flow
+ uint16_t m_prev_flow_seq;
+};
+
+class CRxCoreErrCntrs {
+ friend CRxCoreStateless;
+
+ public:
+ uint64_t get_bad_header() {return m_bad_header;}
+ uint64_t get_old_flow() {return m_old_flow;}
+ CRxCoreErrCntrs() {
+ reset();
+ }
+ void reset() {
+ m_bad_header = 0;
+ m_old_flow = 0;
+ }
+
+ private:
+ uint64_t m_bad_header;
+ uint64_t m_old_flow;
};
class CRxCoreStateless {
@@ -95,13 +122,17 @@ class CRxCoreStateless {
};
public:
- void start(TrexWatchDog &watchdog);
+ void start();
void create(const CRxSlCfg &cfg);
void reset_rx_stats(uint8_t port_id);
int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset
, TrexPlatformApi::driver_stat_cap_e type);
int get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset);
- void work() {m_state = STATE_WORKING;}
+ int get_rx_err_cntrs(CRxCoreErrCntrs *rx_err);
+ void work() {
+ m_state = STATE_WORKING;
+ m_err_cntrs.reset(); // When starting to work, reset global counters
+ }
void idle() {m_state = STATE_IDLE;}
void quit() {m_state = STATE_QUIT;}
bool is_working() const {return (m_ack_start_work_msg == true);}
@@ -126,8 +157,7 @@ class CRxCoreStateless {
private:
- TrexWatchDog *m_watchdog;
- int m_watchdog_handle;
+ TrexMonitor m_monitor;
uint32_t m_max_ports;
bool m_has_streams;
@@ -139,6 +169,7 @@ class CRxCoreStateless {
CCpuUtlCp m_cpu_cp_u;
// Used for acking "work" (go out of idle) messages from cp
volatile bool m_ack_start_work_msg __rte_cache_aligned;
+ CRxCoreErrCntrs m_err_cntrs;
CRFC2544Info m_rfc2544[MAX_FLOW_STATS_PAYLOAD];
};
#endif