diff options
Diffstat (limited to 'src/stateless')
-rw-r--r-- | src/stateless/cp/trex_stateless.cpp | 1 | ||||
-rw-r--r-- | src/stateless/cp/trex_streams_compiler.cpp | 3 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 10 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 8 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.cpp | 150 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.h | 45 |
6 files changed, 147 insertions, 70 deletions
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 9e24802b..9df57a50 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -132,6 +132,7 @@ TrexStateless::encode_stats(Json::Value &global) { api->get_global_stats(stats); global["cpu_util"] = stats.m_stats.m_cpu_util; + global["rx_cpu_util"] = stats.m_stats.m_rx_cpu_util; global["tx_bps"] = stats.m_stats.m_tx_bps; global["rx_bps"] = stats.m_stats.m_rx_bps; diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp index be5002da..563236c2 100644 --- a/src/stateless/cp/trex_streams_compiler.cpp +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -477,7 +477,8 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream, TrexStream *fixed_rx_flow_stat_stream = stream->clone(true); - get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id); //???? check for errors + // not checking for errors. We assume that if add_stream succeeded, start_stream will too. + get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id); /* can this stream be split to many cores ? */ if (!stream->is_splitable(dp_core_count)) { diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 3468d622..7edf0f13 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -209,13 +209,17 @@ TrexDpPortEventMsg::handle() { } /************************* messages from CP to RX **********************/ -bool TrexRxStartMsg::handle (CRxCoreStateless *rx_core) { +bool TrexStatelessRxStartMsg::handle (CRxCoreStateless *rx_core) { rx_core->work(); return true; } -/************************* messages from CP to RX **********************/ -bool TrexRxStopMsg::handle (CRxCoreStateless *rx_core) { +bool TrexStatelessRxStopMsg::handle (CRxCoreStateless *rx_core) { rx_core->idle(); return true; } + +bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) { + rx_core->quit(); + return true; +} diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index b7e8fd3f..0eed01bd 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -356,11 +356,15 @@ public: }; -class TrexRxStartMsg : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxStartMsg : public TrexStatelessCpToRxMsgBase { bool handle (CRxCoreStateless *rx_core); }; -class TrexRxStopMsg : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxStopMsg : public TrexStatelessCpToRxMsgBase { + bool handle (CRxCoreStateless *rx_core); +}; + +class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { bool handle (CRxCoreStateless *rx_core); }; diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index 86711189..ab7c08d1 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -18,6 +18,7 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) { CLatencyManagerPerPort * lp = &m_ports[i]; lp->m_io = cfg.m_ports[i]; } + m_cpu_cp_u.Create(&m_cpu_dp_u); } void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) { @@ -71,32 +72,94 @@ void CRxCoreStateless::idle_state_loop() { } void CRxCoreStateless::start() { - static int count = 0; - static int i = 0; - - while (true) { - if (m_state == STATE_WORKING) { - count += try_rx(); - i++; - if (i == 100) { - i = 0; - // if no packets in 100 cycles, sleep for a while to spare the cpu - if (count == 0) { - delay(1); - } - count = 0; - periodic_check_for_cp_messages(); - } - } else { - idle_state_loop(); - } -#if 0 - ??? do we need this? - if ( m_core->is_terminated_by_master() ) { - break; - } -#endif - } + static int count = 0; + static int i = 0; + bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false; + + while (true) { + if (m_state == STATE_WORKING) { + i++; + if (i == 100) { + i = 0; + // if no packets in 100 cycles, sleep for a while to spare the cpu + if (count == 0) { + delay(1); + } + count = 0; + periodic_check_for_cp_messages(); // m_state might change in here + } + } else { + if (m_state == STATE_QUIT) + break; + idle_state_loop(); + } + if (do_try_rx_queue) { + try_rx_queues(); + } + count += try_rx(); + } +} + +void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPort *lp, rte_mbuf_t *m) { + Cxl710Parser parser; + + if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) { + uint16_t ip_id; + if (parser.get_ip_id(ip_id) == 0) { + if (is_flow_stat_id(ip_id)) { + uint16_t hw_id = get_hw_id(ip_id); + lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1); + lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len); + } + } + } +} + +// In VM setup, handle packets coming as messages from DP cores. +void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) { + while ( true ) { + CGenNode * node; + if ( r->Dequeue(node) != 0 ) { + break; + } + assert(node); + + CGenNodeMsgBase * msg = (CGenNodeMsgBase *)node; + CGenNodeLatencyPktInfo * l_msg; + uint8_t msg_type = msg->m_msg_type; + uint8_t rx_port_index; + CLatencyManagerPerPort * lp; + + switch (msg_type) { + case CGenNodeMsgBase::LATENCY_PKT: + l_msg = (CGenNodeLatencyPktInfo *)msg; + assert(l_msg->m_latency_offset == 0xdead); + rx_port_index = (thread_id << 1) + (l_msg->m_dir & 1); + assert( rx_port_index < m_max_ports ); + lp = &m_ports[rx_port_index]; + handle_rx_pkt(lp, (rte_mbuf_t *)l_msg->m_pkt); + break; + default: + printf("ERROR latency-thread message type is not valid %d \n", msg_type); + assert(0); + } + + CGlobalInfo::free_node(node); + } +} + +// VM mode function. Handle messages from DP +void CRxCoreStateless::try_rx_queues() { + + CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp(); + uint8_t threads=CMsgIns::Ins()->get_num_threads(); + int ti; + for (ti = 0; ti < (int)threads; ti++) { + CNodeRing * r = rx_dp->getRingDpToCp(ti); + if ( ! r->isEmpty() ) { + handle_rx_queue_msgs((uint8_t)ti, r); + } + } } int CRxCoreStateless::try_rx() { @@ -105,26 +168,19 @@ int CRxCoreStateless::try_rx() { for (i = 0; i < m_max_ports; i++) { CLatencyManagerPerPort * lp = &m_ports[i]; rte_mbuf_t * m; + m_cpu_dp_u.start_work(); /* try to read 64 packets clean up the queue */ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64); total_pkts += cnt_p; if (cnt_p) { int j; for (j = 0; j < cnt_p; j++) { - Cxl710Parser parser; m = rx_pkts[j]; - if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) { - uint16_t ip_id; - if (parser.get_ip_id(ip_id) == 0) { - if (is_flow_stat_id(ip_id)) { - uint16_t hw_id = get_hw_id(ip_id); - m_ports[i].m_port.m_rx_pg_bytes[hw_id] += m->pkt_len; - m_ports[i].m_port.m_rx_pg_pkts[hw_id]++; - } - } - } + handle_rx_pkt(lp, m); rte_pktmbuf_free(m); } + /* commit only if there was work to do ! */ + m_cpu_dp_u.commit(); }/* if work */ }// all ports return total_pkts; @@ -141,19 +197,21 @@ uint16_t CRxCoreStateless::get_hw_id(uint16_t id) { void CRxCoreStateless::reset_rx_stats(uint8_t port_id) { for (int hw_id = 0; hw_id < MAX_FLOW_STATS; hw_id++) { - m_ports[port_id].m_port.m_rx_pg_bytes[hw_id] = 0; - m_ports[port_id].m_port.m_rx_pg_pkts[hw_id] = 0; + m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); } } -int CRxCoreStateless::get_rx_stats(uint8_t port_id, uint32_t *pkts, uint32_t *prev_pkts - , uint32_t *bytes, uint32_t *prev_bytes, int min, int max) { +int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset) { for (int hw_id = min; hw_id <= max; hw_id++) { - pkts[hw_id] = m_ports[port_id].m_port.m_rx_pg_pkts[hw_id] - prev_pkts[hw_id]; - prev_pkts[hw_id] = m_ports[port_id].m_port.m_rx_pg_pkts[hw_id]; - bytes[hw_id] = m_ports[port_id].m_port.m_rx_pg_bytes[hw_id] - prev_bytes[hw_id]; - prev_bytes[hw_id] = m_ports[port_id].m_port.m_rx_pg_bytes[hw_id]; + rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id]; + if (reset) { + m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); + } } - return 0; } + +double CRxCoreStateless::get_cpu_util() { + m_cpu_cp_u.Update(); + return m_cpu_cp_u.GetVal(); +} diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index eecc8033..5ab12f4e 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -1,27 +1,28 @@ /* - Ido Barnea - Cisco Systems, Inc. + Ido Barnea + Cisco Systems, Inc. */ /* -Copyright (c) 2016-2016 Cisco Systems, Inc. + Copyright (c) 2016-2016 Cisco Systems, Inc. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ #ifndef __TREX_STATELESS_RX_CORE_H__ #define __TREX_STATELESS_RX_CORE_H__ #include <stdint.h> #include "latency.h" +#include "utl_cpuu.h" class TrexStatelessCpToRxMsgBase; @@ -34,7 +35,7 @@ class CRxSlCfg { public: uint32_t m_max_ports; - double m_cps;// CPS + double m_cps; CPortLatencyHWBase * m_ports[TREX_MAX_PORTS]; }; @@ -42,21 +43,27 @@ class CRxCoreStateless { enum state_e { STATE_IDLE, STATE_WORKING, + STATE_QUIT }; public: void start(); void create(const CRxSlCfg &cfg); void reset_rx_stats(uint8_t port_id); - int get_rx_stats(uint8_t port_id, uint32_t *pkts, uint32_t *prev_pkts - , uint32_t *bytes, uint32_t *prev_bytes, int min, int max); + int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset); void work() {m_state = STATE_WORKING;} void idle() {m_state = STATE_IDLE;} + void quit() {m_state = STATE_QUIT;} + double get_cpu_util(); + private: void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); bool periodic_check_for_cp_messages(); void idle_state_loop(); + void handle_rx_pkt(CLatencyManagerPerPort * lp, rte_mbuf_t * m); + void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); int try_rx(); + void try_rx_queues(); bool is_flow_stat_id(uint16_t id); uint16_t get_hw_id(uint16_t id); @@ -64,8 +71,10 @@ class CRxCoreStateless { uint32_t m_max_ports; bool m_has_streams; CLatencyManagerPerPort m_ports[TREX_MAX_PORTS]; - state_e m_state; /* state of all ports */ - CNodeRing *m_ring_from_cp; - CNodeRing *m_ring_to_cp; + state_e m_state; /* state of all ports */ + CNodeRing *m_ring_from_cp; + CNodeRing *m_ring_to_cp; + CCpuUtlDp m_cpu_dp_u; + CCpuUtlCp m_cpu_cp_u; }; #endif |