diff options
author | 2016-07-31 11:56:41 +0300 | |
---|---|---|
committer | 2016-07-31 11:56:41 +0300 | |
commit | 893d0feef9ba6fa3fb36c49f4b5bcad47cb2bf60 (patch) | |
tree | 689a09fa656f990672d2d62143dc173a46fe0316 /src/stateless | |
parent | abf329075bd14f5f41c3753d560260ac809ec4f3 (diff) | |
parent | dceb010b01e9f8a0e9c905370d39f149f01cab7e (diff) |
Merge branch 'master' into scapy_server
Diffstat (limited to 'src/stateless')
-rw-r--r-- | src/stateless/cp/trex_stateless.cpp | 38 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.h | 6 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 81 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 1 | ||||
-rw-r--r-- | src/stateless/cp/trex_stream.cpp | 2 | ||||
-rw-r--r-- | src/stateless/cp/trex_stream.h | 2 | ||||
-rw-r--r-- | src/stateless/cp/trex_stream_vm.cpp | 72 | ||||
-rw-r--r-- | src/stateless/cp/trex_stream_vm.h | 6 | ||||
-rw-r--r-- | src/stateless/cp/trex_streams_compiler.cpp | 33 | ||||
-rw-r--r-- | src/stateless/cp/trex_streams_compiler.h | 2 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.cpp | 35 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.h | 2 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.cpp | 121 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.h | 39 |
14 files changed, 343 insertions, 97 deletions
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 698ede90..6d80539c 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -54,7 +54,11 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { m_publisher = cfg.m_publisher; /* API core version */ - m_api_classes[APIClass::API_CLASS_TYPE_CORE].init(APIClass::API_CLASS_TYPE_CORE, 1, 2); + const int API_VER_MAJOR = 1; + const int API_VER_MINOR = 3; + m_api_classes[APIClass::API_CLASS_TYPE_CORE].init(APIClass::API_CLASS_TYPE_CORE, + API_VER_MAJOR, + API_VER_MINOR); } /** @@ -64,6 +68,8 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { */ TrexStateless::~TrexStateless() { + shutdown(); + /* release memory for ports */ for (auto port : m_ports) { delete port; @@ -71,15 +77,33 @@ TrexStateless::~TrexStateless() { m_ports.clear(); /* stops the RPC server */ - m_rpc_server->stop(); - delete m_rpc_server; - - m_rpc_server = NULL; + if (m_rpc_server) { + delete m_rpc_server; + m_rpc_server = NULL; + } - delete m_platform_api; - m_platform_api = NULL; + if (m_platform_api) { + delete m_platform_api; + m_platform_api = NULL; + } } +/** +* shutdown the server +*/ +void TrexStateless::shutdown() { + + /* stop ports */ + for (TrexStatelessPort *port : m_ports) { + /* safe to call stop even if not active */ + port->stop_traffic(); + } + + /* shutdown the RPC server */ + if (m_rpc_server) { + m_rpc_server->stop(); + } +} /** * starts the control plane side diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 83ab6976..7ea669df 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -132,6 +132,11 @@ public: /** + * shutdown the server + */ + void shutdown(); + + /** * fetch all the stats * */ @@ -188,6 +193,7 @@ protected: /* API */ APIClass m_api_classes[APIClass::API_CLASS_TYPE_MAX]; + }; /** diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 4dc3e449..0fe4b410 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -87,6 +87,68 @@ protected: } }; +/************************************* + * Streams Feeder + * A class that holds a temporary + * clone of streams that can be + * manipulated + * + * this is a RAII object meant for + * graceful cleanup + ************************************/ +class StreamsFeeder { +public: + StreamsFeeder(TrexStatelessPort *port) { + + /* start pesimistic */ + m_success = false; + + /* fetch the original streams */ + port->get_object_list(m_in_streams); + + for (const TrexStream *in_stream : m_in_streams) { + TrexStream *out_stream = in_stream->clone(true); + + get_stateless_obj()->m_rx_flow_stat.start_stream(out_stream); + + m_out_streams.push_back(out_stream); + } + } + + void set_status(bool status) { + m_success = status; + } + + vector<TrexStream *> &get_streams() { + return m_out_streams; + } + + /** + * RAII + */ + ~StreamsFeeder() { + for (int i = 0; i < m_out_streams.size(); i++) { + TrexStream *out_stream = m_out_streams[i]; + TrexStream *in_stream = m_in_streams[i]; + + if (m_success) { + /* success path */ + get_stateless_obj()->m_rx_flow_stat.copy_state(out_stream, in_stream); + } else { + /* fail path */ + get_stateless_obj()->m_rx_flow_stat.stop_stream(out_stream); + } + delete out_stream; + } + } + +private: + vector<TrexStream *> m_in_streams; + vector<TrexStream *> m_out_streams; + bool m_success; +}; + + /*************************** * trex stateless port * @@ -193,10 +255,7 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, /* caclulate the effective factor for DP */ double factor = calculate_effective_factor(mul, force); - /* fetch all the streams from the table */ - vector<TrexStream *> streams; - get_object_list(streams); - + StreamsFeeder feeder(this); /* compiler it */ std::vector<TrexStreamsCompiledObj *> compiled_objs; @@ -204,15 +263,19 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, TrexStreamsCompiler compiler; bool rc = compiler.compile(m_port_id, - streams, + feeder.get_streams(), compiled_objs, get_dp_core_count(), factor, &fail_msg); + if (!rc) { + feeder.set_status(false); throw TrexException(fail_msg); } + feeder.set_status(true); + /* generate a message to all the relevant DP cores to start transmitting */ assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID); m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent()); @@ -628,6 +691,9 @@ TrexStatelessPort::calculate_effective_factor_internal(const TrexPortMultiplier case TrexPortMultiplier::MUL_BPS: return m_graph_obj->get_factor_bps_l2(mul.m_value); + case TrexPortMultiplier::MUL_BPSL1: + return m_graph_obj->get_factor_bps_l1(mul.m_value); + case TrexPortMultiplier::MUL_PPS: return m_graph_obj->get_factor_pps(mul.m_value); @@ -678,7 +744,7 @@ TrexStatelessPort::delete_streams_graph() { * port multiplier * **************************/ -const std::initializer_list<std::string> TrexPortMultiplier::g_types = {"raw", "bps", "pps", "percentage"}; +const std::initializer_list<std::string> TrexPortMultiplier::g_types = {"raw", "bps", "bpsl1", "pps", "percentage"}; const std::initializer_list<std::string> TrexPortMultiplier::g_ops = {"abs", "add", "sub"}; TrexPortMultiplier:: @@ -692,6 +758,9 @@ TrexPortMultiplier(const std::string &type_str, const std::string &op_str, doubl } else if (type_str == "bps") { type = MUL_BPS; + } else if (type_str == "bpsl1") { + type = MUL_BPSL1; + } else if (type_str == "pps") { type = MUL_PPS; diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 8856b429..915c5325 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -477,6 +477,7 @@ public: enum mul_type_e { MUL_FACTOR, MUL_BPS, + MUL_BPSL1, MUL_PPS, MUL_PERCENTAGE }; diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp index adc08ae2..7a3dfef7 100644 --- a/src/stateless/cp/trex_stream.cpp +++ b/src/stateless/cp/trex_stream.cpp @@ -141,7 +141,7 @@ TrexStream::TrexStream(uint8_t type, m_pkt.binary = NULL; m_pkt.len = 0; - m_expected_pkt_len = 0; + m_expected_pkt_len = 0.0; m_rx_check.m_enabled = false; diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h index 8ac5ebc8..0f59356d 100644 --- a/src/stateless/cp/trex_stream.h +++ b/src/stateless/cp/trex_stream.h @@ -558,7 +558,7 @@ public: StreamVmDp *m_vm_dp; CStreamPktData m_pkt; - uint16_t m_expected_pkt_len; + double m_expected_pkt_len; /* pkt */ diff --git a/src/stateless/cp/trex_stream_vm.cpp b/src/stateless/cp/trex_stream_vm.cpp index 7a1dc122..9e4fbe1c 100644 --- a/src/stateless/cp/trex_stream_vm.cpp +++ b/src/stateless/cp/trex_stream_vm.cpp @@ -27,7 +27,64 @@ limitations under the License. #include <common/Network/Packet/IPHeader.h> #include <common/basic_utils.h> +/** + * provides some tools for the fast rand function + * that is used by the datapath + * some features of this function is different + * from a regular random + * (such as average can be off by few percents) + * + * @author imarom (7/24/2016) + */ +class FastRandUtils { +public: + + /** + * searches the target in the cache + * if not found iterativly calculate it + * and add it to the cache + * + */ + double calc_fastrand_avg(uint16_t target) { + auto search = m_avg_cache.find(target); + if (search != m_avg_cache.end()) { + return search->second; + } + + /* not found - calculate it */ + double avg = iterate_calc(target); + + /* if there is enough space - to the cache */ + if (m_avg_cache.size() <= G_MAX_CACHE_SIZE) { + m_avg_cache[target] = avg; + } + + return avg; + } +private: + + /** + * hard calculate a value using iterations + * + */ + double iterate_calc(uint16_t target) { + const int num_samples = 10000; + uint64_t tmp = 0; + uint32_t seed = 1; + + for (int i = 0; i < num_samples; i++) { + tmp += fastrand(seed) % (target + 1); + } + + return (tmp / double(num_samples)); + } + + std::unordered_map<uint16_t, double> m_avg_cache; + static const uint16_t G_MAX_CACHE_SIZE = 9230; +}; + +static FastRandUtils g_fastrand_util; void StreamVmInstructionFixChecksumIpv4::Dump(FILE *fd){ @@ -350,9 +407,18 @@ void StreamVm::build_flow_var_table() { var.m_ins.m_ins_flowv->m_min_value =60; } - m_expected_pkt_size = (var.m_ins.m_ins_flowv->m_min_value + var.m_ins.m_ins_flowv->m_max_value) / 2; + /* expected packet size calculation */ + + /* for random packet size - we need to find the average */ + if (var.m_ins.m_ins_flowv->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_RANDOM) { + uint16_t range = var.m_ins.m_ins_flowv->m_max_value - var.m_ins.m_ins_flowv->m_min_value; + m_expected_pkt_size = var.m_ins.m_ins_flowv->m_min_value + g_fastrand_util.calc_fastrand_avg(range); + } else { + m_expected_pkt_size = (var.m_ins.m_ins_flowv->m_min_value + var.m_ins.m_ins_flowv->m_max_value) / 2.0; + } + } - }/* for */ + } } @@ -962,7 +1028,7 @@ StreamVm::~StreamVm() { * calculate expected packet size of stream's VM * */ -uint16_t +double StreamVm::calc_expected_pkt_size(uint16_t regular_pkt_size) const { /* if no packet size change - simply return the regular packet size */ diff --git a/src/stateless/cp/trex_stream_vm.h b/src/stateless/cp/trex_stream_vm.h index 58d43bd4..f55d33c1 100644 --- a/src/stateless/cp/trex_stream_vm.h +++ b/src/stateless/cp/trex_stream_vm.h @@ -1417,7 +1417,7 @@ public: m_prefix_size=0; m_bss=0; m_pkt_size=0; - m_expected_pkt_size=0; + m_expected_pkt_size=0.0; m_cur_var_offset=0; m_is_random_var=false; @@ -1435,7 +1435,7 @@ public: * if the VM changes the packet length (random) * */ - uint16_t calc_expected_pkt_size(uint16_t regular_pkt_size) const; + double calc_expected_pkt_size(uint16_t regular_pkt_size) const; @@ -1576,7 +1576,7 @@ private: uint16_t m_prefix_size; uint16_t m_pkt_size; - uint16_t m_expected_pkt_size; + double m_expected_pkt_size; uint16_t m_cur_var_offset; uint16_t m_max_field_update; /* the location of the last byte that is going to be changed in the packet */ diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp index f2296aeb..e54c5f9c 100644 --- a/src/stateless/cp/trex_streams_compiler.cpp +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -382,7 +382,13 @@ TrexStreamsCompiler::compile(uint8_t port_id, assert(dp_core_count > 0); try { - return compile_internal(port_id,streams,objs,dp_core_count,factor,fail_msg); + return compile_internal(port_id, + streams, + objs, + dp_core_count, + factor, + fail_msg); + } catch (const TrexException &ex) { if (fail_msg) { *fail_msg = ex.what(); @@ -411,7 +417,6 @@ TrexStreamsCompiler::compile_internal(uint8_t por GraphNodeMap nodes; - /* compile checks */ pre_compile_check(streams, nodes); @@ -474,7 +479,7 @@ TrexStreamsCompiler::compile_on_single_core(uint8_t } /* compile all the streams */ - for (auto stream : streams) { + for (auto const stream : streams) { /* skip non-enabled streams */ if (!stream->m_enabled) { @@ -507,7 +512,7 @@ TrexStreamsCompiler::compile_on_all_cores(uint8_t } /* compile all the streams */ - for (auto stream : streams) { + for (auto const stream : streams) { /* skip non-enabled streams */ if (!stream->m_enabled) { @@ -527,7 +532,7 @@ TrexStreamsCompiler::compile_on_all_cores(uint8_t * */ void -TrexStreamsCompiler::compile_stream(TrexStream *stream, +TrexStreamsCompiler::compile_stream(const TrexStream *stream, double factor, uint8_t dp_core_count, std::vector<TrexStreamsCompiledObj *> &objs, @@ -543,31 +548,25 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream, new_next_id = nodes.get(stream->m_next_stream_id)->m_compressed_stream_id; } - TrexStream *fixed_rx_flow_stat_stream = stream->clone(true); - - get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream); - // CFlowStatRuleMgr keeps state of the stream object. We duplicated the stream here (in order not - // change the packet kept in the stream). We want the state to be saved in the original stream. - get_stateless_obj()->m_rx_flow_stat.copy_state(fixed_rx_flow_stat_stream, stream); - - fixed_rx_flow_stat_stream->update_rate_factor(factor); + /* we clone because we alter the stream now */ + std::unique_ptr<TrexStream> tmp_stream(stream->clone(true)); + tmp_stream->update_rate_factor(factor); /* can this stream be split to many cores ? */ if ( (dp_core_count == 1) || (!stream->is_splitable(dp_core_count)) ) { - compile_stream_on_single_core(fixed_rx_flow_stat_stream, + compile_stream_on_single_core(tmp_stream.get(), dp_core_count, objs, new_id, new_next_id); } else { - compile_stream_on_all_cores(fixed_rx_flow_stat_stream, + compile_stream_on_all_cores(tmp_stream.get(), dp_core_count, objs, new_id, new_next_id); } - delete fixed_rx_flow_stat_stream; } /** @@ -925,7 +924,7 @@ TrexStreamsGraphObj::find_max_rate() { } /* if not mark as inifite - get the last event time */ - if (m_expected_duration != -1) { + if ( (m_expected_duration != -1) && (m_rate_events.size() > 0) ) { m_expected_duration = m_rate_events.back().time; } diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h index 0ce71b49..171e3aff 100644 --- a/src/stateless/cp/trex_streams_compiler.h +++ b/src/stateless/cp/trex_streams_compiler.h @@ -141,7 +141,7 @@ private: bool all_continues); - void compile_stream(TrexStream *stream, + void compile_stream(const TrexStream *stream, double factor, uint8_t dp_core_count, std::vector<TrexStreamsCompiledObj *> &objs, diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index fe78c5b2..58d8f21a 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -258,23 +258,20 @@ rte_mbuf_t * CGenNodeStateless::alloc_flow_stat_mbuf(rte_mbuf_t *m, struct flow_ fsp_head = (struct flow_stat_payload_header *)(p + rte_pktmbuf_data_len(m) - fsp_head_size); return m; } else { - // r/w --> read only. Should do something like: - // Alloc indirect,. make r/w->indirect point to read_only) -> new fsp_header - // for the mean time, just copy the entire packet. - m_ret = CGlobalInfo::pktmbuf_alloc( get_socket_id(), rte_pktmbuf_pkt_len(m) ); - assert(m_ret); - char *p_new = rte_pktmbuf_append(m_ret, rte_pktmbuf_pkt_len(m)); - rte_mbuf_t *m_free = m; - while (m != NULL) { - char *p = rte_pktmbuf_mtod(m, char*); - memcpy(p_new, p, m->data_len); - p_new += m->data_len; - m = m->next; - } - p_new = rte_pktmbuf_mtod(m_ret, char*); - fsp_head = (struct flow_stat_payload_header *)(p_new + rte_pktmbuf_data_len(m_ret) - fsp_head_size); - rte_pktmbuf_free(m_free); - return m_ret; + // We have: r/w --> read only. + // Changing to: + // (original) r/w -> (new) indirect (direct is original read_only, after trimming last bytes) -> (new) latency info + rte_mbuf_t *m_read_only = m->next, *m_indirect; + + m_indirect = CGlobalInfo::pktmbuf_alloc_small(get_socket_id()); + assert(m_indirect); + // alloc mbuf just for the latency header + m_lat = CGlobalInfo::pktmbuf_alloc( get_socket_id(), fsp_head_size); + assert(m_lat); + fsp_head = (struct flow_stat_payload_header *)rte_pktmbuf_append(m_lat, fsp_head_size); + utl_rte_pktmbuf_chain_with_indirect(m, m_indirect, m_read_only, m_lat); + m_indirect->data_len = (uint16_t)(m_indirect->data_len - fsp_head_size); + return m; } } } @@ -910,6 +907,10 @@ TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port, uint8_t hw_id = stream->m_rx_check.m_hw_id; assert (hw_id < MAX_FLOW_STATS + MAX_FLOW_STATS_PAYLOAD); node->set_stat_hw_id(hw_id); + // no support for cache with flow stat payload rules + if ((TrexPlatformApi::driver_stat_cap_e)stream->m_rx_check.m_rule_type == TrexPlatformApi::IF_STAT_PAYLOAD) { + stream->m_cache_size = 0; + } } /* set socket id */ diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index af2187ae..31cb0be3 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -114,7 +114,7 @@ class TrexStatelessDpCore { public: - #define SCHD_OFFSET_DTIME (10.0/1000000.0) + #define SCHD_OFFSET_DTIME (100.0/1000000.0) /* states */ enum state_e { diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index b3555c13..853fc868 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -29,13 +29,21 @@ void CRFC2544Info::create() { m_latency.Create(); - // This is the seq num value we expect next packet to have. - // Init value should match m_seq_num in CVirtualIFPerSideStats - m_seq = UINT32_MAX - 1; // catch wrap around issues early + m_exp_flow_seq = 0; + m_prev_flow_seq = 0; reset(); } +// after calling stop, packets still arriving will be considered error +void CRFC2544Info::stop() { + m_prev_flow_seq = m_exp_flow_seq; + m_exp_flow_seq = FLOW_STAT_PAYLOAD_INITIAL_FLOW_SEQ; +} + void CRFC2544Info::reset() { + // This is the seq num value we expect next packet to have. + // Init value should match m_seq_num in CVirtualIFPerSideStats + m_seq = UINT32_MAX - 1; // catch wrap around issues early m_seq_err = 0; m_seq_err_events_too_big = 0; m_seq_err_events_too_low = 0; @@ -72,9 +80,6 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_ring_to_cp = cp_rx->getRingDpToCp(0); m_state = STATE_IDLE; - m_watchdog_handle = -1; - m_watchdog = NULL; - for (int i = 0; i < m_max_ports; i++) { CLatencyManagerPerPortStl * lp = &m_ports[i]; lp->m_io = cfg.m_ports[i]; @@ -93,14 +98,14 @@ void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) { } void CRxCoreStateless::tickle() { - m_watchdog->tickle(m_watchdog_handle); + m_monitor.tickle(); } bool CRxCoreStateless::periodic_check_for_cp_messages() { /* tickle the watchdog */ tickle(); - + /* fast path */ if ( likely ( m_ring_from_cp->isEmpty() ) ) { return false; @@ -147,14 +152,14 @@ void CRxCoreStateless::idle_state_loop() { } } -void CRxCoreStateless::start(TrexWatchDog &watchdog) { +void CRxCoreStateless::start() { int count = 0; int i = 0; bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false; /* register a watchdog handle on current core */ - m_watchdog = &watchdog; - m_watchdog_handle = watchdog.register_monitor("STL RX CORE", 1); + m_monitor.create("STL RX CORE", 1); + TrexWatchDog::getInstance().register_monitor(&m_monitor); while (true) { if (m_state == STATE_WORKING) { @@ -179,7 +184,7 @@ void CRxCoreStateless::start(TrexWatchDog &watchdog) { } rte_pause(); - m_watchdog->disable_monitor(m_watchdog_handle); + m_monitor.disable(); } void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t *m) { @@ -190,63 +195,96 @@ void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPortStl *lp, rte_mbuf_t * if (parser.get_ip_id(ip_id) == 0) { if (is_flow_stat_id(ip_id)) { uint16_t hw_id; + if (is_flow_stat_payload_id(ip_id)) { + bool good_packet = true; uint8_t *p = rte_pktmbuf_mtod(m, uint8_t*); struct flow_stat_payload_header *fsp_head = (struct flow_stat_payload_header *) (p + m->pkt_len - sizeof(struct flow_stat_payload_header)); - if (fsp_head->magic == FLOW_STAT_PAYLOAD_MAGIC) { - hw_id = fsp_head->hw_id; - CRFC2544Info &curr_rfc2544 = m_rfc2544[hw_id]; + hw_id = fsp_head->hw_id; + CRFC2544Info *curr_rfc2544; + + if (unlikely(fsp_head->magic != FLOW_STAT_PAYLOAD_MAGIC) || hw_id >= MAX_FLOW_STATS_PAYLOAD) { + good_packet = false; + m_err_cntrs.m_bad_header++; + } else { + curr_rfc2544 = &m_rfc2544[hw_id]; + + if (fsp_head->flow_seq != curr_rfc2544->get_exp_flow_seq()) { + // bad flow seq num + // Might be the first packet of a new flow, packet from an old flow, or garbage. + + if (fsp_head->flow_seq == curr_rfc2544->get_prev_flow_seq()) { + // packet from previous flow using this hw_id that arrived late + good_packet = false; + m_err_cntrs.m_old_flow++; + } else { + if (curr_rfc2544->no_flow_seq()) { + // first packet we see from this flow + good_packet = true; + curr_rfc2544->set_exp_flow_seq(fsp_head->flow_seq); + } else { + // garbage packet + good_packet = false; + m_err_cntrs.m_bad_header++; + } + } + } + } + + if (good_packet) { uint32_t pkt_seq = fsp_head->seq; - uint32_t exp_seq = curr_rfc2544.get_seq(); + uint32_t exp_seq = curr_rfc2544->get_seq(); if (unlikely(pkt_seq != exp_seq)) { if (pkt_seq < exp_seq) { if (exp_seq - pkt_seq > 100000) { // packet loss while we had wrap around - curr_rfc2544.inc_seq_err(pkt_seq - exp_seq); - curr_rfc2544.inc_seq_err_too_big(); - curr_rfc2544.set_seq(pkt_seq + 1); + curr_rfc2544->inc_seq_err(pkt_seq - exp_seq); + curr_rfc2544->inc_seq_err_too_big(); + curr_rfc2544->set_seq(pkt_seq + 1); } else { if (pkt_seq == (exp_seq - 1)) { - curr_rfc2544.inc_dup(); + curr_rfc2544->inc_dup(); } else { - curr_rfc2544.inc_ooo(); + curr_rfc2544->inc_ooo(); // We thought it was lost, but it was just out of order - curr_rfc2544.dec_seq_err(); + curr_rfc2544->dec_seq_err(); } - curr_rfc2544.inc_seq_err_too_low(); + curr_rfc2544->inc_seq_err_too_low(); } } else { if (unlikely (pkt_seq - exp_seq > 100000)) { // packet reorder while we had wrap around if (pkt_seq == (exp_seq - 1)) { - curr_rfc2544.inc_dup(); + curr_rfc2544->inc_dup(); } else { - curr_rfc2544.inc_ooo(); + curr_rfc2544->inc_ooo(); // We thought it was lost, but it was just out of order - curr_rfc2544.dec_seq_err(); + curr_rfc2544->dec_seq_err(); } - curr_rfc2544.inc_seq_err_too_low(); + curr_rfc2544->inc_seq_err_too_low(); } else { - // seq > curr_rfc2544.seq. Assuming lost packets - curr_rfc2544.inc_seq_err(pkt_seq - exp_seq); - curr_rfc2544.inc_seq_err_too_big(); - curr_rfc2544.set_seq(pkt_seq + 1); + // seq > curr_rfc2544->seq. Assuming lost packets + curr_rfc2544->inc_seq_err(pkt_seq - exp_seq); + curr_rfc2544->inc_seq_err_too_big(); + curr_rfc2544->set_seq(pkt_seq + 1); } } } else { - curr_rfc2544.set_seq(pkt_seq + 1); + curr_rfc2544->set_seq(pkt_seq + 1); } lp->m_port.m_rx_pg_stat_payload[hw_id].add_pkts(1); - lp->m_port.m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len); + lp->m_port.m_rx_pg_stat_payload[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC uint64_t d = (os_get_hr_tick_64() - fsp_head->time_stamp ); dsec_t ctime = ptime_convert_hr_dsec(d); - curr_rfc2544.add_sample(ctime); + curr_rfc2544->add_sample(ctime); } } else { hw_id = get_hw_id(ip_id); - lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1); - lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len); + if (hw_id < MAX_FLOW_STATS) { + lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1); + lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len + 4); // +4 for ethernet CRC + } } } } @@ -391,6 +429,12 @@ int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) { for (int hw_id = min; hw_id <= max; hw_id++) { CRFC2544Info &curr_rfc2544 = m_rfc2544[hw_id]; + + if (reset) { + // need to stop first, so count will be consistent + curr_rfc2544.stop(); + } + curr_rfc2544.sample_period_end(); curr_rfc2544.export_data(rfc2544_info[hw_id - min]); @@ -401,6 +445,11 @@ int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, in return 0; } +int CRxCoreStateless::get_rx_err_cntrs(CRxCoreErrCntrs *rx_err) { + *rx_err = m_err_cntrs; + return 0; +} + void CRxCoreStateless::set_working_msg_ack(bool val) { sanb_smp_memory_barrier(); m_ack_start_work_msg = val; diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index ce1bc1ad..fc66704e 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -59,6 +59,7 @@ class CRxSlCfg { class CRFC2544Info { public: void create(); + void stop(); void reset(); void export_data(rfc2544_info_t_ &obj); inline void add_sample(double stime) { @@ -76,6 +77,10 @@ class CRFC2544Info { inline void inc_seq_err_too_low() {m_seq_err_events_too_low++;} inline void inc_dup() {m_dup++;} inline void inc_ooo() {m_ooo++;} + inline uint16_t get_exp_flow_seq() {return m_exp_flow_seq;} + inline void set_exp_flow_seq(uint16_t flow_seq) {m_exp_flow_seq = flow_seq;} + inline uint16_t get_prev_flow_seq() {return m_prev_flow_seq;} + inline bool no_flow_seq() {return (m_exp_flow_seq == FLOW_STAT_PAYLOAD_INITIAL_FLOW_SEQ) ? true : false;} private: uint32_t m_seq; // expected next seq num CTimeHistogram m_latency; // latency info @@ -85,6 +90,28 @@ class CRFC2544Info { uint64_t m_seq_err_events_too_low; // How many packet seq num lower than expected events we had uint64_t m_ooo; // Packets we got with seq num lower than expected (We guess they are out of order) uint64_t m_dup; // Packets we got with same seq num + uint16_t m_exp_flow_seq; // flow sequence number we should see in latency header + // flow sequence number previously used with this id. We use this to catch packets arriving late from an old flow + uint16_t m_prev_flow_seq; +}; + +class CRxCoreErrCntrs { + friend CRxCoreStateless; + + public: + uint64_t get_bad_header() {return m_bad_header;} + uint64_t get_old_flow() {return m_old_flow;} + CRxCoreErrCntrs() { + reset(); + } + void reset() { + m_bad_header = 0; + m_old_flow = 0; + } + + private: + uint64_t m_bad_header; + uint64_t m_old_flow; }; class CRxCoreStateless { @@ -95,13 +122,17 @@ class CRxCoreStateless { }; public: - void start(TrexWatchDog &watchdog); + void start(); void create(const CRxSlCfg &cfg); void reset_rx_stats(uint8_t port_id); int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset , TrexPlatformApi::driver_stat_cap_e type); int get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset); - void work() {m_state = STATE_WORKING;} + int get_rx_err_cntrs(CRxCoreErrCntrs *rx_err); + void work() { + m_state = STATE_WORKING; + m_err_cntrs.reset(); // When starting to work, reset global counters + } void idle() {m_state = STATE_IDLE;} void quit() {m_state = STATE_QUIT;} bool is_working() const {return (m_ack_start_work_msg == true);} @@ -126,8 +157,7 @@ class CRxCoreStateless { private: - TrexWatchDog *m_watchdog; - int m_watchdog_handle; + TrexMonitor m_monitor; uint32_t m_max_ports; bool m_has_streams; @@ -139,6 +169,7 @@ class CRxCoreStateless { CCpuUtlCp m_cpu_cp_u; // Used for acking "work" (go out of idle) messages from cp volatile bool m_ack_start_work_msg __rte_cache_aligned; + CRxCoreErrCntrs m_err_cntrs; CRFC2544Info m_rfc2544[MAX_FLOW_STATS_PAYLOAD]; }; #endif |