diff options
author | Hanoh Haim <hhaim@cisco.com> | 2015-11-18 16:23:55 +0200 |
---|---|---|
committer | Hanoh Haim <hhaim@cisco.com> | 2015-11-18 16:23:55 +0200 |
commit | 0e8c9ae666d61897cb405c469a71be09d54a649b (patch) | |
tree | a35686155fa9f5834acd85ae6f1f4beeeab59b12 /src/stateless/dp | |
parent | aa9bf54e6f892168482ed647a0e67ab10b1cf34a (diff) |
add support for a program of streams. refactor the dp code
Diffstat (limited to 'src/stateless/dp')
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.cpp | 328 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.h | 92 | ||||
-rw-r--r-- | src/stateless/dp/trex_stream_node.h | 101 |
3 files changed, 463 insertions, 58 deletions
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index b25a4cfc..640fdb4d 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -26,12 +27,83 @@ limitations under the License. #include <bp_sim.h> -static inline double -usec_to_sec(double usec) { - return (usec / (1000 * 1000)); + +void CDpOneStream::Delete(CFlowGenListPerThread * core){ + assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE); + core->free_node((CGenNode *)m_node); + delete m_dp_stream; + m_node=0; + m_dp_stream=0; +} + +void CDpOneStream::DeleteOnlyStream(){ + assert(m_dp_stream); + delete m_dp_stream; + m_dp_stream=0; +} + +int CGenNodeStateless::get_stream_id(){ + if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) { + return (-1); // not valid + } + assert(m_ref_stream_info); + return ((int)m_ref_stream_info->m_stream_id); +} + + +void CGenNodeStateless::DumpHeader(FILE *fd){ + fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n"); + +} +void CGenNodeStateless::Dump(FILE *fd){ + fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu \n", + m_time, + (ulong)m_port_id, + "s-pkt", //action + get_stream_state_str(m_state ).c_str(), + get_stream_id(), //stream_id + TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype + (ulong)m_multi_bursts, + (ulong)m_single_burst + ); +} + + +void CGenNodeStateless::refresh(){ + + /* refill the stream info */ + m_single_burst = m_single_burst_refill; + m_multi_bursts = m_ref_stream_info->m_num_bursts; + m_state = CGenNodeStateless::ss_ACTIVE; +} + + + +void CGenNodeCommand::free_command(){ + assert(m_cmd); + delete m_cmd; } +std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){ + std::string res; + + switch (stream_state) { + case CGenNodeStateless::ss_FREE_RESUSE : + res="FREE "; + break; + case CGenNodeStateless::ss_INACTIVE : + res="INACTIVE "; + break; + case CGenNodeStateless::ss_ACTIVE : + res="ACTIVE "; + break; + default: + res="Unknow "; + }; + return(res); +} + void CGenNodeStateless::free_stl_node(){ /* if we have cache mbuf free it */ @@ -43,11 +115,54 @@ void CGenNodeStateless::free_stl_node(){ } +bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){ + m_active_streams-=d; /* reduce the number of streams */ + if (m_active_streams == 0) { + return (true); + } + return (false); +} + + +void TrexStatelessDpPerPort::stop_traffic(uint8_t port_id){ + + assert(m_state==TrexStatelessDpPerPort::ppSTATE_TRANSMITTING); + + for (auto dp_stream : m_active_nodes) { + CGenNodeStateless * node =dp_stream.m_node; + assert(node->get_port_id() == port_id); + if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) { + node->mark_for_free(); + m_active_streams--; + dp_stream.DeleteOnlyStream(); + + }else{ + dp_stream.Delete(m_core); + } + } + + /* active stream should be zero */ + assert(m_active_streams==0); + m_active_nodes.clear(); + m_state=TrexStatelessDpPerPort::ppSTATE_IDLE; +} + + +void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){ + m_core=core; + m_state=TrexStatelessDpPerPort::ppSTATE_IDLE; + m_port_id=0; + m_active_streams=0; + m_active_nodes.clear(); +} + + void TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) { m_thread_id = thread_id; m_core = core; + m_local_port_offset = 2*core->getDualPortId(); CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp(); @@ -55,8 +170,54 @@ TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) { m_ring_to_cp = cp_dp->getRingDpToCp(thread_id); m_state = STATE_IDLE; + + int i; + for (i=0; i<NUM_PORTS_PER_CORE; i++) { + m_ports[i].create(core); + } +} + + +/* move to the next stream, old stream move to INACTIVE */ +bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node, + CGenNodeStateless * next_node){ + + assert(cur_node); + TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id); + bool schedule =false; + + bool to_stop_port=false; + + if (next_node == NULL) { + /* there is no next stream , reduce the number of active streams*/ + to_stop_port = lp_port->update_number_of_active_streams(1); + + }else{ + uint8_t state=next_node->get_state(); + + /* can't be FREE_RESUSE */ + assert(state != CGenNodeStateless::ss_FREE_RESUSE); + if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) { + + /* refill start info and scedule, no update in active streams */ + next_node->refresh(); + schedule = true; + + }else{ + to_stop_port = lp_port->update_number_of_active_streams(1); + } + } + + if ( to_stop_port ) { + /* call stop port explictly to move the state */ + stop_traffic(cur_node->m_port_id); + } + + return ( schedule ); } + + /** * in idle state loop, the processor most of the time sleeps * and periodically checks for messages @@ -76,7 +237,8 @@ TrexStatelessDpCore::idle_state_loop() { void TrexStatelessDpCore::quit_main_loop(){ m_core->set_terminate_mode(true); /* mark it as terminated */ - add_duration(0.0001); /* add message to terminate */ + m_state = STATE_TERMINATE; + add_global_duration(0.0001); } @@ -97,6 +259,7 @@ TrexStatelessDpCore::start_scheduler() { double old_offset = 0.0; m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset); + /* TBD do we need that ? */ m_core->m_node_gen.close_file(m_core); } @@ -105,6 +268,11 @@ void TrexStatelessDpCore::run_once(){ idle_state_loop(); + + if ( m_state == STATE_TERMINATE ){ + return; + } + start_scheduler(); } @@ -121,8 +289,25 @@ TrexStatelessDpCore::start() { } } -void -TrexStatelessDpCore::add_duration(double duration){ +/* only if both port are idle we can exit */ +void +TrexStatelessDpCore::schedule_exit(){ + + CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ; + + node->m_type = CGenNode::COMMAND; + + node->m_cmd = new TrexStatelessDpCanQuit(); + + /* make sure it will be scheduled after the current node */ + node->m_time = m_core->m_cur_time_sec ; + + m_core->m_node_gen.add_node((CGenNode *)node); +} + + +void +TrexStatelessDpCore::add_global_duration(double duration){ if (duration > 0.0) { CGenNode *node = m_core->create_node() ; @@ -135,9 +320,28 @@ TrexStatelessDpCore::add_duration(double duration){ } } +/* add per port exit */ +void +TrexStatelessDpCore::add_port_duration(double duration, + uint8_t port_id){ + if (duration > 0.0) { + CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ; + + node->m_type = CGenNode::COMMAND; + + /* make sure it will be scheduled after the current node */ + node->m_time = m_core->m_cur_time_sec + duration ; + + node->m_cmd = new TrexStatelessDpStop(port_id); + + m_core->m_node_gen.add_node((CGenNode *)node); + } +} + void -TrexStatelessDpCore::add_cont_stream(TrexStream * stream, +TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port, + TrexStream * stream, TrexStreamsCompiledObj *comp) { CGenNodeStateless *node = m_core->create_node_sl(); @@ -145,6 +349,19 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream, /* add periodic */ node->m_type = CGenNode::STATELESS_PKT; + node->m_ref_stream_info = stream->clone_as_dp(); + + node->m_next_stream=0; /* will be fixed later */ + + + if ( stream->m_self_start ){ + /* if self start it is in active mode */ + node->m_state =CGenNodeStateless::ss_ACTIVE; + lp_port->m_active_streams++; + }else{ + node->m_state =CGenNodeStateless::ss_INACTIVE; + } + node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec); pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id); @@ -166,6 +383,10 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream, switch ( stream->m_type ) { case TrexStream::stCONTINUOUS : + node->m_single_burst=0; + node->m_single_burst_refill=0; + node->m_multi_bursts=0; + node->m_ibg_sec = 0.0; break; case TrexStream::stSINGLE_BURST : @@ -187,7 +408,6 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream, assert(0); }; - node->m_is_stream_active = 1; node->m_port_id = stream->m_port_id; /* allocate const mbuf */ @@ -208,59 +428,93 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream, /* set the packet as a readonly */ node->set_cache_mbuf(m); - /* keep track */ - m_active_nodes.push_back(node); + CDpOneStream one_stream; + + one_stream.m_dp_stream = node->m_ref_stream_info; + one_stream.m_node =node; + + lp_port->m_active_nodes.push_back(one_stream); /* schedule */ m_core->m_node_gen.add_node((CGenNode *)node); - m_state = TrexStatelessDpCore::STATE_TRANSMITTING; - } void -TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, double duration) { +TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, + double duration) { + +#if 0 + /* TBD to remove ! */ + obj->Dump(stdout); +#endif + + TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id()); + lp_port->m_active_streams = 0; + /* no nodes in the list */ + assert(lp_port->m_active_nodes.size()==0); + + for (auto single_stream : obj->get_objects()) { + /* all commands should be for the same port */ + assert(obj->get_port_id() == single_stream.m_stream->m_port_id); + add_cont_stream(lp_port,single_stream.m_stream,obj); + } + + uint32_t nodes = lp_port->m_active_nodes.size(); + /* find next stream */ + assert(nodes == obj->get_objects().size()); + + int cnt=0; + + /* set the next_stream pointer */ for (auto single_stream : obj->get_objects()) { - add_cont_stream(single_stream.m_stream,obj); + + if (single_stream.m_stream->is_dp_next_stream() ) { + int stream_id = single_stream.m_stream->m_next_stream_id; + assert(stream_id<nodes); + /* point to the next stream , stream_id is fixed */ + lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ; + } + cnt++; } + lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING; + m_state = TrexStatelessDpCore::STATE_TRANSMITTING; + + if ( duration > 0.0 ){ - add_duration( duration ); + add_port_duration( duration ,obj->get_port_id() ); + } +} + + +bool TrexStatelessDpCore::are_all_ports_idle(){ + + bool res=true; + int i; + for (i=0; i<NUM_PORTS_PER_CORE; i++) { + if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){ + res=false; + } } + return (res); } + void TrexStatelessDpCore::stop_traffic(uint8_t port_id) { /* we cannot remove nodes not from the top of the queue so for every active node - make sure next time the scheduler invokes it, it will be free */ - for (auto node : m_active_nodes) { - if (node->m_port_id == port_id) { - node->m_is_stream_active = 0; - } - } - /* remove all the non active nodes */ - auto pred = std::remove_if(m_active_nodes.begin(), - m_active_nodes.end(), - [](CGenNodeStateless *node) { return (!node->m_is_stream_active); }); + TrexStatelessDpPerPort * lp_port = get_port_db(port_id); - m_active_nodes.erase(pred, m_active_nodes.end()); + lp_port->stop_traffic(port_id); - if (m_active_nodes.size() == 0) { - m_state = STATE_IDLE; - /* stop the scheduler */ + if ( are_all_ports_idle() ) { - CGenNode *node = m_core->create_node() ; - - node->m_type = CGenNode::EXIT_SCHED; - - /* make sure it will be scheduled after the current node */ - node->m_time = m_core->m_cur_time_sec + 0.0001; - - m_core->m_node_gen.add_node(node); + schedule_exit(); } - } /** diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index aaa6eed3..28fff2fb 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -33,6 +34,52 @@ class CGenNodeStateless; class TrexStreamsCompiledObj; class TrexStream; + +class CDpOneStream { +public: + void Create(){ + } + + void Delete(CFlowGenListPerThread * core); + void DeleteOnlyStream(); + + CGenNodeStateless * m_node; // schedule node + TrexStream * m_dp_stream; // stream info +}; + +class TrexStatelessDpPerPort { + +public: + /* states */ + enum state_e { + ppSTATE_IDLE, + ppSTATE_TRANSMITTING + }; + +public: + TrexStatelessDpPerPort(){ + } + + void create(CFlowGenListPerThread * core); + + void stop_traffic(uint8_t port_id); + + bool update_number_of_active_streams(uint32_t d); + +public: + + state_e m_state; + uint8_t m_port_id; + + uint32_t m_active_streams; /* how many active streams on this port */ + + std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */ + CFlowGenListPerThread * m_core ; +}; + +/* for now */ +#define NUM_PORTS_PER_CORE 2 + class TrexStatelessDpCore { public: @@ -40,7 +87,9 @@ public: /* states */ enum state_e { STATE_IDLE, - STATE_TRANSMITTING + STATE_TRANSMITTING, + STATE_TERMINATE, + }; TrexStatelessDpCore() { @@ -83,6 +132,11 @@ public: */ void stop_traffic(uint8_t port_id); + + /* return if all ports are idel */ + bool are_all_ports_idle(); + + /** * check for and handle messages from CP * @@ -112,7 +166,23 @@ public: /* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */ void quit_main_loop(); + bool set_stateless_next_node(CGenNodeStateless * cur_node, + CGenNodeStateless * next_node); + private: + + + TrexStatelessDpPerPort * get_port_db(uint8_t port_id){ + assert((m_local_port_offset==port_id) ||(m_local_port_offset+1==port_id)); + uint8_t local_port_id = port_id -m_local_port_offset; + assert(local_port_id<NUM_PORTS_PER_CORE); + return (&m_ports[local_port_id]); + } + + + void schedule_exit(); + + /** * in idle state loop, the processor most of the time sleeps * and periodically checks for messages @@ -135,21 +205,27 @@ private: */ void handle_cp_msg(TrexStatelessCpToDpMsgBase *msg); - /* add global exit */ - void add_duration(double duration); - void add_cont_stream(TrexStream * stream,TrexStreamsCompiledObj *comp); + void add_port_duration(double duration, + uint8_t port_id); + + void add_global_duration(double duration); + + void add_cont_stream(TrexStatelessDpPerPort * lp_port, + TrexStream * stream, + TrexStreamsCompiledObj *comp); uint8_t m_thread_id; - state_e m_state; + uint8_t m_local_port_offset; + + state_e m_state; /* state of all ports */ CNodeRing *m_ring_from_cp; CNodeRing *m_ring_to_cp; - /* holds the current active nodes */ - std::vector<CGenNodeStateless *> m_active_nodes; + TrexStatelessDpPerPort m_ports[NUM_PORTS_PER_CORE]; /* pointer to the main object */ - CFlowGenListPerThread *m_core; + CFlowGenListPerThread * m_core; double m_duration; }; diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h index e4cf964d..1e53887b 100644 --- a/src/stateless/dp/trex_stream_node.h +++ b/src/stateless/dp/trex_stream_node.h @@ -1,5 +1,6 @@ /* Itay Marom + Hanoh Haim Cisco Systems, Inc. */ @@ -27,20 +28,51 @@ limitations under the License. class TrexStatelessDpCore; #include <trex_stream.h> +class TrexStatelessCpToDpMsgBase; + +struct CGenNodeCommand : public CGenNodeBase { + +friend class TrexStatelessDpCore; + +public: + TrexStatelessCpToDpMsgBase * m_cmd; + + uint8_t m_pad_end[104]; + +public: + void free_command(); + +} __rte_cache_aligned;; + + +static_assert(sizeof(CGenNodeCommand) == sizeof(CGenNode), "sizeof(CGenNodeCommand) != sizeof(CGenNode)" ); + /* this is a event for stateless */ struct CGenNodeStateless : public CGenNodeBase { friend class TrexStatelessDpCore; +public: + enum { + ss_FREE_RESUSE =1, /* should be free by scheduler */ + ss_INACTIVE =2, /* will be active by other stream or stopped */ + ss_ACTIVE =3 /* the stream is active */ + }; + typedef uint8_t stream_state_t ; + + static std::string get_stream_state_str(stream_state_t stream_state); + private: + /* cache line 0 */ + /* important stuff here */ void * m_cache_mbuf; double m_next_time_offset; /* in sec */ double m_ibg_sec; /* inter burst time in sec */ - uint8_t m_is_stream_active; + stream_state_t m_state; uint8_t m_port_id; - uint8_t m_stream_type; /* TrexStream::STREAM_TYPE */ + uint8_t m_stream_type; /* see TrexStream::STREAM_TYPE ,stream_type_t */ uint8_t m_pad; uint32_t m_single_burst; /* the number of bursts in case of burst */ @@ -48,14 +80,40 @@ private: uint32_t m_multi_bursts; /* in case of multi_burst how many bursts */ - + /* cache line 1 */ + TrexStream * m_ref_stream_info; /* the stream info */ + CGenNodeStateless * m_next_stream; /* pad to match the size of CGenNode */ - uint8_t m_pad_end[65]; + uint8_t m_pad_end[56]; + + public: + uint8_t get_port_id(){ + return (m_port_id); + } + + + /* we restart the stream, schedule it using stream isg */ + inline void update_refresh_time(double cur_time){ + m_time = cur_time + usec_to_sec(m_ref_stream_info->m_isg_usec); + } + + inline bool is_mask_for_free(){ + return (get_state() == CGenNodeStateless::ss_FREE_RESUSE ?true:false); + + } + inline void mark_for_free(){ + set_state(CGenNodeStateless::ss_FREE_RESUSE); + /* only to be safe */ + m_ref_stream_info= NULL; + m_next_stream= NULL; + + } + inline uint8_t get_stream_type(){ return (m_stream_type); } @@ -72,11 +130,16 @@ public: return (m_multi_bursts); } + inline void set_state(stream_state_t new_state){ + m_state=new_state; + } - inline bool is_active() { - return m_is_stream_active; + + inline stream_state_t get_state() { + return m_state; } + void refresh(); inline void handle_continues(CFlowGenListPerThread *thread) { thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); @@ -100,14 +163,22 @@ public: }else{ m_multi_bursts--; if ( m_multi_bursts == 0 ) { - /* stop */ - m_is_stream_active =0; + set_state(CGenNodeStateless::ss_INACTIVE); + if ( thread->set_stateless_next_node(this,m_next_stream) ){ + /* update the next stream time using isg */ + m_next_stream->update_refresh_time(m_time); + + thread->m_node_gen.m_p_queue.push( (CGenNode *)m_next_stream); + }else{ + // in case of zero we will schedule a command to stop + // will be called from set_stateless_next_node + } + }else{ m_time += m_ibg_sec; m_single_burst = m_single_burst_refill; - + thread->m_node_gen.m_p_queue.push( (CGenNode *)this); } - thread->m_node_gen.m_p_queue.push( (CGenNode *)this); } } @@ -168,10 +239,14 @@ public: void free_stl_node(); +public: + /* debug functions */ - void Dump(FILE *fd){ - fprintf(fd," %f, %lu, %lu \n",m_time,(ulong)m_port_id,(ulong)get_mbuf_cache_dir()); - } + int get_stream_id(); + + static void DumpHeader(FILE *fd); + + void Dump(FILE *fd); } __rte_cache_aligned; |