diff options
author | 2015-11-19 12:49:10 +0200 | |
---|---|---|
committer | 2015-11-19 12:49:10 +0200 | |
commit | 76248b13906f575f709ed2270d63ec41131f4bdf (patch) | |
tree | 6b5313815b6181aed313ea6fc58b6ec311d554ea /src/stateless | |
parent | a7317d45787669af71ca8c65fd1e51f8a47d2c1e (diff) | |
parent | 91a4e6cc117076d3f5d34437581f7ffe91e6892b (diff) |
Merge branch 'master' of csi-sceasr-b45:/auto/proj-pcube-b/apps/PL-b/tools/repo//trex-core
Conflicts:
src/stateless/cp/trex_streams_compiler.h
src/stateless/dp/trex_stateless_dp_core.cpp
src/stateless/dp/trex_stateless_dp_core.h
src/stateless/messaging/trex_stateless_messaging.cpp
src/stateless/messaging/trex_stateless_messaging.h
Diffstat (limited to 'src/stateless')
-rw-r--r-- | src/stateless/cp/trex_dp_port_events.cpp | 215 | ||||
-rw-r--r-- | src/stateless/cp/trex_dp_port_events.h | 165 | ||||
-rw-r--r-- | src/stateless/cp/trex_stream.cpp | 65 | ||||
-rw-r--r-- | src/stateless/cp/trex_stream.h | 21 | ||||
-rw-r--r-- | src/stateless/cp/trex_streams_compiler.cpp | 64 | ||||
-rw-r--r-- | src/stateless/cp/trex_streams_compiler.h | 14 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.cpp | 330 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.h | 92 | ||||
-rw-r--r-- | src/stateless/dp/trex_stream_node.h | 101 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 20 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 32 |
11 files changed, 1048 insertions, 71 deletions
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp new file mode 100644 index 00000000..533ab605 --- /dev/null +++ b/src/stateless/cp/trex_dp_port_events.cpp @@ -0,0 +1,215 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include <trex_dp_port_events.h> +#include <sstream> +#include <os_time.h> +#include <trex_stateless.h> + +/** + * port events + */ +void +TrexDpPortEvents::create(TrexStatelessPort *port) { + m_port = port; + + for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) { + m_events[i].create((TrexDpPortEvent::event_e) i, port); + } + + m_event_id_counter = EVENT_ID_INVALID; +} + +/** + * generate a new event ID + * + */ +int +TrexDpPortEvents::generate_event_id() { + return (++m_event_id_counter); +} + +/** + * mark the next allowed event + * all other events will be disabled + * + */ +void +TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) { + + /* first disable all events */ + for (TrexDpPortEvent & e : m_events) { + e.disable(); + } + + /* mark this event as allowed */ + m_events[ev].wait_for_event(event_id, timeout_ms); +} + +/** + * handle an event + * + */ +void +TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) { + m_events[ev].handle_event(thread_id, event_id); +} + +/*********** + * single event object + * + */ + +void +TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) { + m_event_type = type; + m_port = port; + + /* add the core ids to the hash */ + m_signal.clear(); + for (int core_id : m_port->get_core_id_list()) { + m_signal[core_id] = false; + } + + /* event is disabled */ + disable(); +} + + +/** + * wait the event using event id and timeout + * + */ +void +TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) { + + /* set a new event id */ + m_event_id = event_id; + + /* do we have a timeout ? */ + if (timeout_ms > 0) { + m_expire_limit_ms = os_get_time_msec() + timeout_ms; + } else { + m_expire_limit_ms = -1; + } + + /* prepare the signal array */ + m_pending_cnt = 0; + for (auto & core_pair : m_signal) { + core_pair.second = false; + m_pending_cnt++; + } +} + +void +TrexDpPortEvent::disable() { + m_event_id = TrexDpPortEvents::EVENT_ID_INVALID; +} + +/** + * get the event status + * + */ + +TrexDpPortEvent::event_status_e +TrexDpPortEvent::status() { + + /* is it even active ? */ + if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) { + return (EVENT_DISABLE); + } + + /* did it occured ? */ + if (m_pending_cnt == 0) { + return (EVENT_OCCURED); + } + + /* so we are enabled and the event did not occur - maybe we timed out ? */ + if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) { + return (EVENT_TIMED_OUT); + } + + /* so we are still waiting... */ + return (EVENT_PENDING); + +} + +void +TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg) { + std::stringstream err; + err << "DP event '" << event_name(m_event_type) << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - "; +} + +/** + * event occured + * + */ +void +TrexDpPortEvent::handle_event(int thread_id, int event_id) { + + /* if the event is disabled - we don't care */ + if (!is_active()) { + return; + } + + /* check the event id is matching the required event */ + if (event_id != m_event_id) { + err(thread_id, event_id, "event key mismatch"); + } + + /* mark sure no double signal */ + if (m_signal.at(thread_id)) { + err(thread_id, event_id, "double signal"); + + } else { + /* mark */ + m_signal.at(thread_id) = true; + m_pending_cnt--; + } + + /* event occured */ + if (m_pending_cnt == 0) { + m_port->on_dp_event_occured(m_event_type); + m_event_id = TrexDpPortEvents::EVENT_ID_INVALID; + } +} + +bool +TrexDpPortEvent::is_active() { + return (status() != EVENT_DISABLE); +} + +bool +TrexDpPortEvent::has_timeout_expired() { + return (status() == EVENT_TIMED_OUT); +} + +const char * +TrexDpPortEvent::event_name(event_e type) { + switch (type) { + case EVENT_STOP: + return "DP STOP"; + + default: + throw TrexException("unknown event type"); + } + +} diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h new file mode 100644 index 00000000..309288df --- /dev/null +++ b/src/stateless/cp/trex_dp_port_events.h @@ -0,0 +1,165 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_DP_PORT_EVENTS_H__ +#define __TREX_DP_PORT_EVENTS_H__ + +#include <unordered_map> +#include <string> + +class TrexStatelessPort; + +/** + * describes a single DP event related to port + * + * @author imarom (18-Nov-15) + */ +class TrexDpPortEvent { +public: + + enum event_e { + EVENT_STOP = 1, + EVENT_MAX + }; + + /** + * status of the event for the port + */ + enum event_status_e { + EVENT_DISABLE, + EVENT_PENDING, + EVENT_TIMED_OUT, + EVENT_OCCURED + }; + + /** + * init for the event + * + */ + void create(event_e type, TrexStatelessPort *port); + + /** + * create a new pending event + * + */ + void wait_for_event(int event_id, int timeout_ms = -1); + + /** + * mark event as not allowed to happen + * + */ + void disable(); + + /** + * get the event status + * + */ + event_status_e status(); + + /** + * event occured + * + */ + void handle_event(int thread_id, int event_id); + + /** + * returns true if event is active + * + */ + bool is_active(); + + /** + * has timeout already expired ? + * + */ + bool has_timeout_expired(); + + /** + * generate error + * + */ + void err(int thread_id, int event_id, const std::string &err_msg); + + /** + * event to name + * + */ + static const char * event_name(event_e type); + + +private: + + event_e m_event_type; + std::unordered_map<int, bool> m_signal; + int m_pending_cnt; + + TrexStatelessPort *m_port; + int m_event_id; + int m_expire_limit_ms; + +}; + +/** + * all the events related to a port + * + */ +class TrexDpPortEvents { +public: + friend class TrexDpPortEvent; + + void create(TrexStatelessPort *port); + + /** + * generate a new event ID to be used with wait_for_event + * + */ + int generate_event_id(); + + /** + * wait a new DP event on the port + * returns a key which will be used to identify + * the event happened + * + * @author imarom (18-Nov-15) + * + * @param ev - type of event + * @param event_id - a unique identifier for the event + * @param timeout_ms - does it has a timeout ? + * + */ + void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1); + + /** + * event has occured + * + */ + void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id); + +private: + static const int EVENT_ID_INVALID = -1; + + TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX]; + int m_event_id_counter; + + TrexStatelessPort *m_port; + +}; + +#endif /* __TREX_DP_PORT_EVENTS_H__ */ diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp index 1a05257c..5203b2a2 100644 --- a/src/stateless/cp/trex_stream.cpp +++ b/src/stateless/cp/trex_stream.cpp @@ -25,6 +25,71 @@ limitations under the License. /************************************** * stream *************************************/ + + +std::string TrexStream::get_stream_type_str(stream_type_t stream_type){ + + std::string res; + + + switch (stream_type) { + + case stCONTINUOUS : + res="stCONTINUOUS "; + break; + + case stSINGLE_BURST : + res="stSINGLE_BURST "; + break; + + case stMULTI_BURST : + res="stMULTI_BURST "; + break; + default: + res="Unknow "; + }; + return(res); +} + + +void TrexStream::Dump(FILE *fd){ + + fprintf(fd,"\n"); + fprintf(fd,"==> Stream_id : %lu \n",(ulong)m_stream_id); + fprintf(fd," Enabled : %lu \n",(ulong)(m_enabled?1:0)); + fprintf(fd," Self_start : %lu \n",(ulong)(m_self_start?1:0)); + + if (m_next_stream_id>=0) { + fprintf(fd," Nex_stream_id : %lu \n",(ulong)m_next_stream_id); + }else { + fprintf(fd," Nex_stream_id : %d \n",m_next_stream_id); + } + + fprintf(fd," Port_id : %lu \n",(ulong)m_port_id); + + if (m_isg_usec>0.0) { + fprintf(fd," isg : %6.2f \n",m_isg_usec); + } + fprintf(fd," type : %s \n",get_stream_type_str(m_type).c_str()); + + if ( m_type == TrexStream::stCONTINUOUS ) { + fprintf(fd," pps : %f \n",m_pps); + } + if (m_type == TrexStream::stSINGLE_BURST) { + fprintf(fd," pps : %f \n",m_pps); + fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts); + } + if (m_type == TrexStream::stMULTI_BURST) { + fprintf(fd," pps : %f \n",m_pps); + fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts); + fprintf(fd," mburst : %lu \n",(ulong)m_num_bursts); + if (m_ibg_usec>0.0) { + fprintf(fd," m_ibg_usec : %f \n",m_ibg_usec); + } + } +} + + TrexStream::TrexStream(uint8_t type, uint8_t port_id, uint32_t stream_id) : m_port_id(port_id), m_stream_id(stream_id) { diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h index c2628cc3..0634829e 100644 --- a/src/stateless/cp/trex_stream.h +++ b/src/stateless/cp/trex_stream.h @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -65,6 +66,9 @@ public: stMULTI_BURST = 6 }; + typedef uint8_t stream_type_t ; + + static std::string get_stream_type_str(stream_type_t stream_type); public: TrexStream(uint8_t type,uint8_t port_id, uint32_t stream_id); @@ -80,6 +84,12 @@ public: /* access the stream json */ const Json::Value & get_stream_json(); + /* compress the stream id to be zero based */ + void fix_dp_stream_id(uint32_t my_stream_id,int next_stream_id){ + m_stream_id = my_stream_id; + m_next_stream_id = next_stream_id; + } + double get_pps() { return m_pps; } @@ -96,6 +106,14 @@ public: return ( m_type ); } + bool is_dp_next_stream(){ + if (m_next_stream_id<0) { + return (false); + }else{ + return (true); + } + } + void set_multi_burst(uint32_t burst_total_pkts, @@ -132,11 +150,12 @@ public: return (dp); } + void Dump(FILE *fd); public: /* basic */ uint8_t m_type; uint8_t m_port_id; - uint32_t m_stream_id; + uint32_t m_stream_id; /* id from RPC can be anything */ /* config fields */ diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp index 0c3b4ef0..bdfc3c01 100644 --- a/src/stateless/cp/trex_streams_compiler.cpp +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -36,6 +36,7 @@ class GraphNode { public: GraphNode(TrexStream *stream, GraphNode *next) : m_stream(stream), m_next(next) { marked = false; + m_compressed_stream_id=-1; } uint32_t get_stream_id() const { @@ -46,6 +47,7 @@ public: GraphNode *m_next; std::vector<const GraphNode *> m_parents; bool marked; + int m_compressed_stream_id; }; /** @@ -143,8 +145,10 @@ TrexStreamsCompiledObj::~TrexStreamsCompiledObj() { m_objs.clear(); } + void -TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream) { +TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream){ + obj_st obj; obj.m_stream = stream->clone_as_dp(); @@ -152,6 +156,26 @@ TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream) { m_objs.push_back(obj); } +void +TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream, + uint32_t my_dp_id, int next_dp_id) { + obj_st obj; + + obj.m_stream = stream->clone_as_dp(); + /* compress the id's*/ + obj.m_stream->fix_dp_stream_id(my_dp_id,next_dp_id); + + m_objs.push_back(obj); +} + +void TrexStreamsCompiledObj::Dump(FILE *fd){ + for (auto obj : m_objs) { + obj.m_stream->Dump(fd); + } +} + + + TrexStreamsCompiledObj * TrexStreamsCompiledObj::clone() { @@ -197,6 +221,8 @@ void TrexStreamsCompiler::allocate_pass(const std::vector<TrexStream *> &streams, GraphNodeMap *nodes) { std::stringstream ss; + uint32_t compressed_stream_id=0; + /* first pass - allocate all nodes and check for duplicates */ for (auto stream : streams) { @@ -216,6 +242,10 @@ TrexStreamsCompiler::allocate_pass(const std::vector<TrexStream *> &streams, } GraphNode *node = new GraphNode(stream, NULL); + /* allocate new compressed id */ + node->m_compressed_stream_id = compressed_stream_id; + + compressed_stream_id++; /* add to the map */ assert(nodes->add(node)); @@ -323,9 +353,8 @@ TrexStreamsCompiler::check_for_unreachable_streams(GraphNodeMap *nodes) { * @return bool */ void -TrexStreamsCompiler::pre_compile_check(const std::vector<TrexStream *> &streams) { - - GraphNodeMap nodes; +TrexStreamsCompiler::pre_compile_check(const std::vector<TrexStream *> &streams, + GraphNodeMap & nodes) { m_warnings.clear(); @@ -348,9 +377,20 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj, std::string *fail_msg) { +#if 0 + fprintf(stdout,"------------pre compile \n"); + for (auto stream : streams) { + stream->Dump(stdout); + } + fprintf(stdout,"------------pre compile \n"); +#endif + + GraphNodeMap nodes; + + /* compile checks */ try { - pre_compile_check(streams); + pre_compile_check(streams,nodes); } catch (const TrexException &ex) { if (fail_msg) { *fail_msg = ex.what(); @@ -360,6 +400,7 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, return false; } + /* for now we do something trivial, */ for (auto stream : streams) { @@ -368,8 +409,19 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, continue; } + int new_id= nodes.get(stream->m_stream_id)->m_compressed_stream_id; + assert(new_id>=0); + uint32_t my_stream_id = (uint32_t)new_id; + int my_next_stream_id=-1; + if (stream->m_next_stream_id>=0) { + my_next_stream_id=nodes.get(stream->m_next_stream_id)->m_compressed_stream_id; + } + /* add it */ - obj.add_compiled_stream(stream); + obj.add_compiled_stream(stream, + my_stream_id, + my_next_stream_id + ); } return true; diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h index c80dddef..200f7ce9 100644 --- a/src/stateless/cp/trex_streams_compiler.h +++ b/src/stateless/cp/trex_streams_compiler.h @@ -50,6 +50,10 @@ public: return m_objs; } + uint8_t get_port_id(){ + return (m_port_id); + } + /** * clone the compiled object * @@ -60,12 +64,13 @@ public: return (m_mul); } - uint8_t get_port_id() { - return m_port_id; - } + void Dump(FILE *fd); private: + void add_compiled_stream(TrexStream * stream, + uint32_t my_dp_id, int next_dp_id); void add_compiled_stream(TrexStream * stream); + std::vector<obj_st> m_objs; uint8_t m_port_id; @@ -94,7 +99,8 @@ public: private: - void pre_compile_check(const std::vector<TrexStream *> &streams); + void pre_compile_check(const std::vector<TrexStream *> &streams, + GraphNodeMap & nodes); void allocate_pass(const std::vector<TrexStream *> &streams, GraphNodeMap *nodes); void direct_pass(GraphNodeMap *nodes); void check_for_unreachable_streams(GraphNodeMap *nodes); diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 4a74d9e5..cde34a4b 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -26,12 +27,83 @@ limitations under the License. #include <bp_sim.h> -static inline double -usec_to_sec(double usec) { - return (usec / (1000 * 1000)); + +void CDpOneStream::Delete(CFlowGenListPerThread * core){ + assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE); + core->free_node((CGenNode *)m_node); + delete m_dp_stream; + m_node=0; + m_dp_stream=0; +} + +void CDpOneStream::DeleteOnlyStream(){ + assert(m_dp_stream); + delete m_dp_stream; + m_dp_stream=0; +} + +int CGenNodeStateless::get_stream_id(){ + if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) { + return (-1); // not valid + } + assert(m_ref_stream_info); + return ((int)m_ref_stream_info->m_stream_id); +} + + +void CGenNodeStateless::DumpHeader(FILE *fd){ + fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n"); + +} +void CGenNodeStateless::Dump(FILE *fd){ + fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu \n", + m_time, + (ulong)m_port_id, + "s-pkt", //action + get_stream_state_str(m_state ).c_str(), + get_stream_id(), //stream_id + TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype + (ulong)m_multi_bursts, + (ulong)m_single_burst + ); +} + + +void CGenNodeStateless::refresh(){ + + /* refill the stream info */ + m_single_burst = m_single_burst_refill; + m_multi_bursts = m_ref_stream_info->m_num_bursts; + m_state = CGenNodeStateless::ss_ACTIVE; +} + + + +void CGenNodeCommand::free_command(){ + assert(m_cmd); + delete m_cmd; } +std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){ + std::string res; + + switch (stream_state) { + case CGenNodeStateless::ss_FREE_RESUSE : + res="FREE "; + break; + case CGenNodeStateless::ss_INACTIVE : + res="INACTIVE "; + break; + case CGenNodeStateless::ss_ACTIVE : + res="ACTIVE "; + break; + default: + res="Unknow "; + }; + return(res); +} + void CGenNodeStateless::free_stl_node(){ /* if we have cache mbuf free it */ @@ -43,11 +115,54 @@ void CGenNodeStateless::free_stl_node(){ } +bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){ + m_active_streams-=d; /* reduce the number of streams */ + if (m_active_streams == 0) { + return (true); + } + return (false); +} + + +void TrexStatelessDpPerPort::stop_traffic(uint8_t port_id){ + + assert(m_state==TrexStatelessDpPerPort::ppSTATE_TRANSMITTING); + + for (auto dp_stream : m_active_nodes) { + CGenNodeStateless * node =dp_stream.m_node; + assert(node->get_port_id() == port_id); + if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) { + node->mark_for_free(); + m_active_streams--; + dp_stream.DeleteOnlyStream(); + + }else{ + dp_stream.Delete(m_core); + } + } + + /* active stream should be zero */ + assert(m_active_streams==0); + m_active_nodes.clear(); + m_state=TrexStatelessDpPerPort::ppSTATE_IDLE; +} + + +void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){ + m_core=core; + m_state=TrexStatelessDpPerPort::ppSTATE_IDLE; + m_port_id=0; + m_active_streams=0; + m_active_nodes.clear(); +} + + void TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) { m_thread_id = thread_id; m_core = core; + m_local_port_offset = 2*core->getDualPortId(); CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp(); @@ -55,8 +170,54 @@ TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) { m_ring_to_cp = cp_dp->getRingDpToCp(thread_id); m_state = STATE_IDLE; + + int i; + for (i=0; i<NUM_PORTS_PER_CORE; i++) { + m_ports[i].create(core); + } } + +/* move to the next stream, old stream move to INACTIVE */ +bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node, + CGenNodeStateless * next_node){ + + assert(cur_node); + TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id); + bool schedule =false; + + bool to_stop_port=false; + + if (next_node == NULL) { + /* there is no next stream , reduce the number of active streams*/ + to_stop_port = lp_port->update_number_of_active_streams(1); + + }else{ + uint8_t state=next_node->get_state(); + + /* can't be FREE_RESUSE */ + assert(state != CGenNodeStateless::ss_FREE_RESUSE); + if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) { + + /* refill start info and scedule, no update in active streams */ + next_node->refresh(); + schedule = true; + + }else{ + to_stop_port = lp_port->update_number_of_active_streams(1); + } + } + + if ( to_stop_port ) { + /* call stop port explictly to move the state */ + stop_traffic(cur_node->m_port_id); + } + + return ( schedule ); +} + + + /** * in idle state loop, the processor most of the time sleeps * and periodically checks for messages @@ -76,7 +237,8 @@ TrexStatelessDpCore::idle_state_loop() { void TrexStatelessDpCore::quit_main_loop(){ m_core->set_terminate_mode(true); /* mark it as terminated */ - add_duration(0.0001); /* add message to terminate */ + m_state = STATE_TERMINATE; + add_global_duration(0.0001); } @@ -97,6 +259,7 @@ TrexStatelessDpCore::start_scheduler() { double old_offset = 0.0; m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset); + /* TBD do we need that ? */ m_core->m_node_gen.close_file(m_core); } @@ -105,6 +268,11 @@ void TrexStatelessDpCore::run_once(){ idle_state_loop(); + + if ( m_state == STATE_TERMINATE ){ + return; + } + start_scheduler(); } @@ -121,8 +289,25 @@ TrexStatelessDpCore::start() { } } -void -TrexStatelessDpCore::add_duration(double duration){ +/* only if both port are idle we can exit */ +void +TrexStatelessDpCore::schedule_exit(){ + + CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ; + + node->m_type = CGenNode::COMMAND; + + node->m_cmd = new TrexStatelessDpCanQuit(); + + /* make sure it will be scheduled after the current node */ + node->m_time = m_core->m_cur_time_sec ; + + m_core->m_node_gen.add_node((CGenNode *)node); +} + + +void +TrexStatelessDpCore::add_global_duration(double duration){ if (duration > 0.0) { CGenNode *node = m_core->create_node() ; @@ -135,9 +320,28 @@ TrexStatelessDpCore::add_duration(double duration){ } } +/* add per port exit */ +void +TrexStatelessDpCore::add_port_duration(double duration, + uint8_t port_id){ + if (duration > 0.0) { + CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ; + + node->m_type = CGenNode::COMMAND; + + /* make sure it will be scheduled after the current node */ + node->m_time = m_core->m_cur_time_sec + duration ; + + node->m_cmd = new TrexStatelessDpStop(port_id); + + m_core->m_node_gen.add_node((CGenNode *)node); + } +} + void -TrexStatelessDpCore::add_cont_stream(TrexStream * stream, +TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port, + TrexStream * stream, TrexStreamsCompiledObj *comp) { CGenNodeStateless *node = m_core->create_node_sl(); @@ -145,6 +349,19 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream, /* add periodic */ node->m_type = CGenNode::STATELESS_PKT; + node->m_ref_stream_info = stream->clone_as_dp(); + + node->m_next_stream=0; /* will be fixed later */ + + + if ( stream->m_self_start ){ + /* if self start it is in active mode */ + node->m_state =CGenNodeStateless::ss_ACTIVE; + lp_port->m_active_streams++; + }else{ + node->m_state =CGenNodeStateless::ss_INACTIVE; + } + node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec); pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id); @@ -166,6 +383,10 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream, switch ( stream->m_type ) { case TrexStream::stCONTINUOUS : + node->m_single_burst=0; + node->m_single_burst_refill=0; + node->m_multi_bursts=0; + node->m_ibg_sec = 0.0; break; case TrexStream::stSINGLE_BURST : @@ -187,7 +408,6 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream, assert(0); }; - node->m_is_stream_active = 1; node->m_port_id = stream->m_port_id; /* allocate const mbuf */ @@ -208,58 +428,94 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream, /* set the packet as a readonly */ node->set_cache_mbuf(m); - /* keep track */ - m_active_nodes.push_back(node); + CDpOneStream one_stream; - /* schedule */ - m_core->m_node_gen.add_node((CGenNode *)node); + one_stream.m_dp_stream = node->m_ref_stream_info; + one_stream.m_node =node; - m_state = TrexStatelessDpCore::STATE_TRANSMITTING; + lp_port->m_active_nodes.push_back(one_stream); + /* schedule only if active */ + if (node->m_state == CGenNodeStateless::ss_ACTIVE) { + m_core->m_node_gen.add_node((CGenNode *)node); + } } void -TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, double duration) { +TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, + double duration) { + +#if 0 + /* TBD to remove ! */ + obj->Dump(stdout); +#endif + + TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id()); + lp_port->m_active_streams = 0; + /* no nodes in the list */ + assert(lp_port->m_active_nodes.size()==0); + + for (auto single_stream : obj->get_objects()) { + /* all commands should be for the same port */ + assert(obj->get_port_id() == single_stream.m_stream->m_port_id); + add_cont_stream(lp_port,single_stream.m_stream,obj); + } + + uint32_t nodes = lp_port->m_active_nodes.size(); + /* find next stream */ + assert(nodes == obj->get_objects().size()); + + int cnt=0; + + /* set the next_stream pointer */ for (auto single_stream : obj->get_objects()) { - add_cont_stream(single_stream.m_stream,obj); + + if (single_stream.m_stream->is_dp_next_stream() ) { + int stream_id = single_stream.m_stream->m_next_stream_id; + assert(stream_id<nodes); + /* point to the next stream , stream_id is fixed */ + lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ; + } + cnt++; } + lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING; + m_state = TrexStatelessDpCore::STATE_TRANSMITTING; + + if ( duration > 0.0 ){ - add_duration( duration ); + add_port_duration( duration ,obj->get_port_id() ); } } + +bool TrexStatelessDpCore::are_all_ports_idle(){ + + bool res=true; + int i; + for (i=0; i<NUM_PORTS_PER_CORE; i++) { + if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){ + res=false; + } + } + return (res); +} + + void TrexStatelessDpCore::stop_traffic(uint8_t port_id) { /* we cannot remove nodes not from the top of the queue so for every active node - make sure next time the scheduler invokes it, it will be free */ - for (auto node : m_active_nodes) { - if (node->m_port_id == port_id) { - node->m_is_stream_active = 0; - } - } - /* remove all the non active nodes */ - auto pred = std::remove_if(m_active_nodes.begin(), - m_active_nodes.end(), - [](CGenNodeStateless *node) { return (!node->m_is_stream_active); }); + TrexStatelessDpPerPort * lp_port = get_port_db(port_id); - m_active_nodes.erase(pred, m_active_nodes.end()); + lp_port->stop_traffic(port_id); - if (m_active_nodes.size() == 0) { - m_state = STATE_IDLE; - /* stop the scheduler */ + if ( are_all_ports_idle() ) { - CGenNode *node = m_core->create_node() ; - - node->m_type = CGenNode::EXIT_SCHED; - - /* make sure it will be scheduled after the current node */ - node->m_time = m_core->m_cur_time_sec + 0.0001; - - m_core->m_node_gen.add_node(node); + schedule_exit(); } /* inform the control plane we stopped - this might be a async stop diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index d07e1d3a..7ee5abc4 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -33,6 +34,52 @@ class CGenNodeStateless; class TrexStreamsCompiledObj; class TrexStream; + +class CDpOneStream { +public: + void Create(){ + } + + void Delete(CFlowGenListPerThread * core); + void DeleteOnlyStream(); + + CGenNodeStateless * m_node; // schedule node + TrexStream * m_dp_stream; // stream info +}; + +class TrexStatelessDpPerPort { + +public: + /* states */ + enum state_e { + ppSTATE_IDLE, + ppSTATE_TRANSMITTING + }; + +public: + TrexStatelessDpPerPort(){ + } + + void create(CFlowGenListPerThread * core); + + void stop_traffic(uint8_t port_id); + + bool update_number_of_active_streams(uint32_t d); + +public: + + state_e m_state; + uint8_t m_port_id; + + uint32_t m_active_streams; /* how many active streams on this port */ + + std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */ + CFlowGenListPerThread * m_core ; +}; + +/* for now */ +#define NUM_PORTS_PER_CORE 2 + class TrexStatelessDpCore { public: @@ -40,7 +87,9 @@ public: /* states */ enum state_e { STATE_IDLE, - STATE_TRANSMITTING + STATE_TRANSMITTING, + STATE_TERMINATE + }; TrexStatelessDpCore() { @@ -83,6 +132,11 @@ public: */ void stop_traffic(uint8_t port_id); + + /* return if all ports are idel */ + bool are_all_ports_idle(); + + /** * check for and handle messages from CP * @@ -120,7 +174,23 @@ public: return m_event_id; } + bool set_stateless_next_node(CGenNodeStateless * cur_node, + CGenNodeStateless * next_node); + private: + + + TrexStatelessDpPerPort * get_port_db(uint8_t port_id){ + assert((m_local_port_offset==port_id) ||(m_local_port_offset+1==port_id)); + uint8_t local_port_id = port_id -m_local_port_offset; + assert(local_port_id<NUM_PORTS_PER_CORE); + return (&m_ports[local_port_id]); + } + + + void schedule_exit(); + + /** * in idle state loop, the processor most of the time sleeps * and periodically checks for messages @@ -143,21 +213,27 @@ private: */ void handle_cp_msg(TrexStatelessCpToDpMsgBase *msg); - /* add global exit */ - void add_duration(double duration); - void add_cont_stream(TrexStream * stream,TrexStreamsCompiledObj *comp); + void add_port_duration(double duration, + uint8_t port_id); + + void add_global_duration(double duration); + + void add_cont_stream(TrexStatelessDpPerPort * lp_port, + TrexStream * stream, + TrexStreamsCompiledObj *comp); uint8_t m_thread_id; - state_e m_state; + uint8_t m_local_port_offset; + + state_e m_state; /* state of all ports */ CNodeRing *m_ring_from_cp; CNodeRing *m_ring_to_cp; - /* holds the current active nodes */ - std::vector<CGenNodeStateless *> m_active_nodes; + TrexStatelessDpPerPort m_ports[NUM_PORTS_PER_CORE]; /* pointer to the main object */ - CFlowGenListPerThread *m_core; + CFlowGenListPerThread * m_core; double m_duration; int m_event_id; diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h index e4cf964d..1e53887b 100644 --- a/src/stateless/dp/trex_stream_node.h +++ b/src/stateless/dp/trex_stream_node.h @@ -1,5 +1,6 @@ /* Itay Marom + Hanoh Haim Cisco Systems, Inc. */ @@ -27,20 +28,51 @@ limitations under the License. class TrexStatelessDpCore; #include <trex_stream.h> +class TrexStatelessCpToDpMsgBase; + +struct CGenNodeCommand : public CGenNodeBase { + +friend class TrexStatelessDpCore; + +public: + TrexStatelessCpToDpMsgBase * m_cmd; + + uint8_t m_pad_end[104]; + +public: + void free_command(); + +} __rte_cache_aligned;; + + +static_assert(sizeof(CGenNodeCommand) == sizeof(CGenNode), "sizeof(CGenNodeCommand) != sizeof(CGenNode)" ); + /* this is a event for stateless */ struct CGenNodeStateless : public CGenNodeBase { friend class TrexStatelessDpCore; +public: + enum { + ss_FREE_RESUSE =1, /* should be free by scheduler */ + ss_INACTIVE =2, /* will be active by other stream or stopped */ + ss_ACTIVE =3 /* the stream is active */ + }; + typedef uint8_t stream_state_t ; + + static std::string get_stream_state_str(stream_state_t stream_state); + private: + /* cache line 0 */ + /* important stuff here */ void * m_cache_mbuf; double m_next_time_offset; /* in sec */ double m_ibg_sec; /* inter burst time in sec */ - uint8_t m_is_stream_active; + stream_state_t m_state; uint8_t m_port_id; - uint8_t m_stream_type; /* TrexStream::STREAM_TYPE */ + uint8_t m_stream_type; /* see TrexStream::STREAM_TYPE ,stream_type_t */ uint8_t m_pad; uint32_t m_single_burst; /* the number of bursts in case of burst */ @@ -48,14 +80,40 @@ private: uint32_t m_multi_bursts; /* in case of multi_burst how many bursts */ - + /* cache line 1 */ + TrexStream * m_ref_stream_info; /* the stream info */ + CGenNodeStateless * m_next_stream; /* pad to match the size of CGenNode */ - uint8_t m_pad_end[65]; + uint8_t m_pad_end[56]; + + public: + uint8_t get_port_id(){ + return (m_port_id); + } + + + /* we restart the stream, schedule it using stream isg */ + inline void update_refresh_time(double cur_time){ + m_time = cur_time + usec_to_sec(m_ref_stream_info->m_isg_usec); + } + + inline bool is_mask_for_free(){ + return (get_state() == CGenNodeStateless::ss_FREE_RESUSE ?true:false); + + } + inline void mark_for_free(){ + set_state(CGenNodeStateless::ss_FREE_RESUSE); + /* only to be safe */ + m_ref_stream_info= NULL; + m_next_stream= NULL; + + } + inline uint8_t get_stream_type(){ return (m_stream_type); } @@ -72,11 +130,16 @@ public: return (m_multi_bursts); } + inline void set_state(stream_state_t new_state){ + m_state=new_state; + } - inline bool is_active() { - return m_is_stream_active; + + inline stream_state_t get_state() { + return m_state; } + void refresh(); inline void handle_continues(CFlowGenListPerThread *thread) { thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); @@ -100,14 +163,22 @@ public: }else{ m_multi_bursts--; if ( m_multi_bursts == 0 ) { - /* stop */ - m_is_stream_active =0; + set_state(CGenNodeStateless::ss_INACTIVE); + if ( thread->set_stateless_next_node(this,m_next_stream) ){ + /* update the next stream time using isg */ + m_next_stream->update_refresh_time(m_time); + + thread->m_node_gen.m_p_queue.push( (CGenNode *)m_next_stream); + }else{ + // in case of zero we will schedule a command to stop + // will be called from set_stateless_next_node + } + }else{ m_time += m_ibg_sec; m_single_burst = m_single_burst_refill; - + thread->m_node_gen.m_p_queue.push( (CGenNode *)this); } - thread->m_node_gen.m_p_queue.push( (CGenNode *)this); } } @@ -168,10 +239,14 @@ public: void free_stl_node(); +public: + /* debug functions */ - void Dump(FILE *fd){ - fprintf(fd," %f, %lu, %lu \n",m_time,(ulong)m_port_id,(ulong)get_mbuf_cache_dir()); - } + int get_stream_id(); + + static void DumpHeader(FILE *fd); + + void Dump(FILE *fd); } __rte_cache_aligned; diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index a2d00f8b..c92ad68a 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -98,7 +99,6 @@ TrexStatelessDpQuit::clone(){ } - bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){ /* quit */ @@ -106,6 +106,23 @@ bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){ return (true); } +bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){ + + if ( dp_core->are_all_ports_idle() ){ + /* if all ports are idle quit now */ + set_quit(true); + } + return (true); +} + +TrexStatelessCpToDpMsgBase * +TrexStatelessDpCanQuit::clone(){ + + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpCanQuit(); + + return new_msg; +} + /************************* messages from DP to CP **********************/ bool @@ -115,3 +132,4 @@ TrexDpPortEventMsg::handle() { return (true); } + diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 3fb1ef84..445e9378 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -37,6 +37,7 @@ class TrexStatelessCpToDpMsgBase { public: TrexStatelessCpToDpMsgBase() { + m_quit_scheduler=false; } virtual ~TrexStatelessCpToDpMsgBase() { @@ -51,11 +52,21 @@ public: */ virtual TrexStatelessCpToDpMsgBase * clone() = 0; + /* do we want to quit scheduler, can be set by handle function */ + void set_quit(bool enable){ + m_quit_scheduler=enable; + } + + bool is_quit(){ + return ( m_quit_scheduler); + } + /* no copy constructor */ TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete; protected: - int m_event_id; + int m_event_id; + bool m_quit_scheduler; }; /** @@ -118,6 +129,24 @@ public: }; +/** + * a message to check if both port are idel and exit + * + * @author hhaim + */ +class TrexStatelessDpCanQuit : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpCanQuit() { + } + + virtual bool handle(TrexStatelessDpCore *dp_core); + + virtual TrexStatelessCpToDpMsgBase * clone(); +}; + + + /************************* messages from DP to CP **********************/ /** @@ -145,6 +174,7 @@ public: }; + /** * a message indicating an event has happened on a port at the * DP |