summaryrefslogtreecommitdiffstats
path: root/src/stateless/dp
diff options
context:
space:
mode:
authorHanoh Haim <hhaim@cisco.com>2015-11-18 16:23:55 +0200
committerHanoh Haim <hhaim@cisco.com>2015-11-18 16:23:55 +0200
commit0e8c9ae666d61897cb405c469a71be09d54a649b (patch)
treea35686155fa9f5834acd85ae6f1f4beeeab59b12 /src/stateless/dp
parentaa9bf54e6f892168482ed647a0e67ab10b1cf34a (diff)
add support for a program of streams. refactor the dp code
Diffstat (limited to 'src/stateless/dp')
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp328
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h92
-rw-r--r--src/stateless/dp/trex_stream_node.h101
3 files changed, 463 insertions, 58 deletions
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index b25a4cfc..640fdb4d 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -26,12 +27,83 @@ limitations under the License.
#include <bp_sim.h>
-static inline double
-usec_to_sec(double usec) {
- return (usec / (1000 * 1000));
+
+void CDpOneStream::Delete(CFlowGenListPerThread * core){
+ assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
+ core->free_node((CGenNode *)m_node);
+ delete m_dp_stream;
+ m_node=0;
+ m_dp_stream=0;
+}
+
+void CDpOneStream::DeleteOnlyStream(){
+ assert(m_dp_stream);
+ delete m_dp_stream;
+ m_dp_stream=0;
+}
+
+int CGenNodeStateless::get_stream_id(){
+ if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
+ return (-1); // not valid
+ }
+ assert(m_ref_stream_info);
+ return ((int)m_ref_stream_info->m_stream_id);
+}
+
+
+void CGenNodeStateless::DumpHeader(FILE *fd){
+ fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
+
+}
+void CGenNodeStateless::Dump(FILE *fd){
+ fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu \n",
+ m_time,
+ (ulong)m_port_id,
+ "s-pkt", //action
+ get_stream_state_str(m_state ).c_str(),
+ get_stream_id(), //stream_id
+ TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
+ (ulong)m_multi_bursts,
+ (ulong)m_single_burst
+ );
+}
+
+
+void CGenNodeStateless::refresh(){
+
+ /* refill the stream info */
+ m_single_burst = m_single_burst_refill;
+ m_multi_bursts = m_ref_stream_info->m_num_bursts;
+ m_state = CGenNodeStateless::ss_ACTIVE;
+}
+
+
+
+void CGenNodeCommand::free_command(){
+ assert(m_cmd);
+ delete m_cmd;
}
+std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
+ std::string res;
+
+ switch (stream_state) {
+ case CGenNodeStateless::ss_FREE_RESUSE :
+ res="FREE ";
+ break;
+ case CGenNodeStateless::ss_INACTIVE :
+ res="INACTIVE ";
+ break;
+ case CGenNodeStateless::ss_ACTIVE :
+ res="ACTIVE ";
+ break;
+ default:
+ res="Unknow ";
+ };
+ return(res);
+}
+
void CGenNodeStateless::free_stl_node(){
/* if we have cache mbuf free it */
@@ -43,11 +115,54 @@ void CGenNodeStateless::free_stl_node(){
}
+bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
+ m_active_streams-=d; /* reduce the number of streams */
+ if (m_active_streams == 0) {
+ return (true);
+ }
+ return (false);
+}
+
+
+void TrexStatelessDpPerPort::stop_traffic(uint8_t port_id){
+
+ assert(m_state==TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node =dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
+ if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
+ node->mark_for_free();
+ m_active_streams--;
+ dp_stream.DeleteOnlyStream();
+
+ }else{
+ dp_stream.Delete(m_core);
+ }
+ }
+
+ /* active stream should be zero */
+ assert(m_active_streams==0);
+ m_active_nodes.clear();
+ m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
+}
+
+
+void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){
+ m_core=core;
+ m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
+ m_port_id=0;
+ m_active_streams=0;
+ m_active_nodes.clear();
+}
+
+
void
TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
m_thread_id = thread_id;
m_core = core;
+ m_local_port_offset = 2*core->getDualPortId();
CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
@@ -55,8 +170,54 @@ TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
m_ring_to_cp = cp_dp->getRingDpToCp(thread_id);
m_state = STATE_IDLE;
+
+ int i;
+ for (i=0; i<NUM_PORTS_PER_CORE; i++) {
+ m_ports[i].create(core);
+ }
+}
+
+
+/* move to the next stream, old stream move to INACTIVE */
+bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node){
+
+ assert(cur_node);
+ TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
+ bool schedule =false;
+
+ bool to_stop_port=false;
+
+ if (next_node == NULL) {
+ /* there is no next stream , reduce the number of active streams*/
+ to_stop_port = lp_port->update_number_of_active_streams(1);
+
+ }else{
+ uint8_t state=next_node->get_state();
+
+ /* can't be FREE_RESUSE */
+ assert(state != CGenNodeStateless::ss_FREE_RESUSE);
+ if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
+
+ /* refill start info and scedule, no update in active streams */
+ next_node->refresh();
+ schedule = true;
+
+ }else{
+ to_stop_port = lp_port->update_number_of_active_streams(1);
+ }
+ }
+
+ if ( to_stop_port ) {
+ /* call stop port explictly to move the state */
+ stop_traffic(cur_node->m_port_id);
+ }
+
+ return ( schedule );
}
+
+
/**
* in idle state loop, the processor most of the time sleeps
* and periodically checks for messages
@@ -76,7 +237,8 @@ TrexStatelessDpCore::idle_state_loop() {
void TrexStatelessDpCore::quit_main_loop(){
m_core->set_terminate_mode(true); /* mark it as terminated */
- add_duration(0.0001); /* add message to terminate */
+ m_state = STATE_TERMINATE;
+ add_global_duration(0.0001);
}
@@ -97,6 +259,7 @@ TrexStatelessDpCore::start_scheduler() {
double old_offset = 0.0;
m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
+ /* TBD do we need that ? */
m_core->m_node_gen.close_file(m_core);
}
@@ -105,6 +268,11 @@ void
TrexStatelessDpCore::run_once(){
idle_state_loop();
+
+ if ( m_state == STATE_TERMINATE ){
+ return;
+ }
+
start_scheduler();
}
@@ -121,8 +289,25 @@ TrexStatelessDpCore::start() {
}
}
-void
-TrexStatelessDpCore::add_duration(double duration){
+/* only if both port are idle we can exit */
+void
+TrexStatelessDpCore::schedule_exit(){
+
+ CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
+
+ node->m_type = CGenNode::COMMAND;
+
+ node->m_cmd = new TrexStatelessDpCanQuit();
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = m_core->m_cur_time_sec ;
+
+ m_core->m_node_gen.add_node((CGenNode *)node);
+}
+
+
+void
+TrexStatelessDpCore::add_global_duration(double duration){
if (duration > 0.0) {
CGenNode *node = m_core->create_node() ;
@@ -135,9 +320,28 @@ TrexStatelessDpCore::add_duration(double duration){
}
}
+/* add per port exit */
+void
+TrexStatelessDpCore::add_port_duration(double duration,
+ uint8_t port_id){
+ if (duration > 0.0) {
+ CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
+
+ node->m_type = CGenNode::COMMAND;
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = m_core->m_cur_time_sec + duration ;
+
+ node->m_cmd = new TrexStatelessDpStop(port_id);
+
+ m_core->m_node_gen.add_node((CGenNode *)node);
+ }
+}
+
void
-TrexStatelessDpCore::add_cont_stream(TrexStream * stream,
+TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port,
+ TrexStream * stream,
TrexStreamsCompiledObj *comp) {
CGenNodeStateless *node = m_core->create_node_sl();
@@ -145,6 +349,19 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream,
/* add periodic */
node->m_type = CGenNode::STATELESS_PKT;
+ node->m_ref_stream_info = stream->clone_as_dp();
+
+ node->m_next_stream=0; /* will be fixed later */
+
+
+ if ( stream->m_self_start ){
+ /* if self start it is in active mode */
+ node->m_state =CGenNodeStateless::ss_ACTIVE;
+ lp_port->m_active_streams++;
+ }else{
+ node->m_state =CGenNodeStateless::ss_INACTIVE;
+ }
+
node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
@@ -166,6 +383,10 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream,
switch ( stream->m_type ) {
case TrexStream::stCONTINUOUS :
+ node->m_single_burst=0;
+ node->m_single_burst_refill=0;
+ node->m_multi_bursts=0;
+ node->m_ibg_sec = 0.0;
break;
case TrexStream::stSINGLE_BURST :
@@ -187,7 +408,6 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream,
assert(0);
};
- node->m_is_stream_active = 1;
node->m_port_id = stream->m_port_id;
/* allocate const mbuf */
@@ -208,59 +428,93 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream,
/* set the packet as a readonly */
node->set_cache_mbuf(m);
- /* keep track */
- m_active_nodes.push_back(node);
+ CDpOneStream one_stream;
+
+ one_stream.m_dp_stream = node->m_ref_stream_info;
+ one_stream.m_node =node;
+
+ lp_port->m_active_nodes.push_back(one_stream);
/* schedule */
m_core->m_node_gen.add_node((CGenNode *)node);
- m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
-
}
void
-TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, double duration) {
+TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
+ double duration) {
+
+#if 0
+ /* TBD to remove ! */
+ obj->Dump(stdout);
+#endif
+
+ TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
+ lp_port->m_active_streams = 0;
+ /* no nodes in the list */
+ assert(lp_port->m_active_nodes.size()==0);
+
+ for (auto single_stream : obj->get_objects()) {
+ /* all commands should be for the same port */
+ assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
+ add_cont_stream(lp_port,single_stream.m_stream,obj);
+ }
+
+ uint32_t nodes = lp_port->m_active_nodes.size();
+ /* find next stream */
+ assert(nodes == obj->get_objects().size());
+
+ int cnt=0;
+
+ /* set the next_stream pointer */
for (auto single_stream : obj->get_objects()) {
- add_cont_stream(single_stream.m_stream,obj);
+
+ if (single_stream.m_stream->is_dp_next_stream() ) {
+ int stream_id = single_stream.m_stream->m_next_stream_id;
+ assert(stream_id<nodes);
+ /* point to the next stream , stream_id is fixed */
+ lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
+ }
+ cnt++;
}
+ lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
+ m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
+
+
if ( duration > 0.0 ){
- add_duration( duration );
+ add_port_duration( duration ,obj->get_port_id() );
+ }
+}
+
+
+bool TrexStatelessDpCore::are_all_ports_idle(){
+
+ bool res=true;
+ int i;
+ for (i=0; i<NUM_PORTS_PER_CORE; i++) {
+ if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
+ res=false;
+ }
}
+ return (res);
}
+
void
TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
/* we cannot remove nodes not from the top of the queue so
for every active node - make sure next time
the scheduler invokes it, it will be free */
- for (auto node : m_active_nodes) {
- if (node->m_port_id == port_id) {
- node->m_is_stream_active = 0;
- }
- }
- /* remove all the non active nodes */
- auto pred = std::remove_if(m_active_nodes.begin(),
- m_active_nodes.end(),
- [](CGenNodeStateless *node) { return (!node->m_is_stream_active); });
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
- m_active_nodes.erase(pred, m_active_nodes.end());
+ lp_port->stop_traffic(port_id);
- if (m_active_nodes.size() == 0) {
- m_state = STATE_IDLE;
- /* stop the scheduler */
+ if ( are_all_ports_idle() ) {
- CGenNode *node = m_core->create_node() ;
-
- node->m_type = CGenNode::EXIT_SCHED;
-
- /* make sure it will be scheduled after the current node */
- node->m_time = m_core->m_cur_time_sec + 0.0001;
-
- m_core->m_node_gen.add_node(node);
+ schedule_exit();
}
-
}
/**
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index aaa6eed3..28fff2fb 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -33,6 +34,52 @@ class CGenNodeStateless;
class TrexStreamsCompiledObj;
class TrexStream;
+
+class CDpOneStream {
+public:
+ void Create(){
+ }
+
+ void Delete(CFlowGenListPerThread * core);
+ void DeleteOnlyStream();
+
+ CGenNodeStateless * m_node; // schedule node
+ TrexStream * m_dp_stream; // stream info
+};
+
+class TrexStatelessDpPerPort {
+
+public:
+ /* states */
+ enum state_e {
+ ppSTATE_IDLE,
+ ppSTATE_TRANSMITTING
+ };
+
+public:
+ TrexStatelessDpPerPort(){
+ }
+
+ void create(CFlowGenListPerThread * core);
+
+ void stop_traffic(uint8_t port_id);
+
+ bool update_number_of_active_streams(uint32_t d);
+
+public:
+
+ state_e m_state;
+ uint8_t m_port_id;
+
+ uint32_t m_active_streams; /* how many active streams on this port */
+
+ std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */
+ CFlowGenListPerThread * m_core ;
+};
+
+/* for now */
+#define NUM_PORTS_PER_CORE 2
+
class TrexStatelessDpCore {
public:
@@ -40,7 +87,9 @@ public:
/* states */
enum state_e {
STATE_IDLE,
- STATE_TRANSMITTING
+ STATE_TRANSMITTING,
+ STATE_TERMINATE,
+
};
TrexStatelessDpCore() {
@@ -83,6 +132,11 @@ public:
*/
void stop_traffic(uint8_t port_id);
+
+ /* return if all ports are idel */
+ bool are_all_ports_idle();
+
+
/**
* check for and handle messages from CP
*
@@ -112,7 +166,23 @@ public:
/* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */
void quit_main_loop();
+ bool set_stateless_next_node(CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node);
+
private:
+
+
+ TrexStatelessDpPerPort * get_port_db(uint8_t port_id){
+ assert((m_local_port_offset==port_id) ||(m_local_port_offset+1==port_id));
+ uint8_t local_port_id = port_id -m_local_port_offset;
+ assert(local_port_id<NUM_PORTS_PER_CORE);
+ return (&m_ports[local_port_id]);
+ }
+
+
+ void schedule_exit();
+
+
/**
* in idle state loop, the processor most of the time sleeps
* and periodically checks for messages
@@ -135,21 +205,27 @@ private:
*/
void handle_cp_msg(TrexStatelessCpToDpMsgBase *msg);
- /* add global exit */
- void add_duration(double duration);
- void add_cont_stream(TrexStream * stream,TrexStreamsCompiledObj *comp);
+ void add_port_duration(double duration,
+ uint8_t port_id);
+
+ void add_global_duration(double duration);
+
+ void add_cont_stream(TrexStatelessDpPerPort * lp_port,
+ TrexStream * stream,
+ TrexStreamsCompiledObj *comp);
uint8_t m_thread_id;
- state_e m_state;
+ uint8_t m_local_port_offset;
+
+ state_e m_state; /* state of all ports */
CNodeRing *m_ring_from_cp;
CNodeRing *m_ring_to_cp;
- /* holds the current active nodes */
- std::vector<CGenNodeStateless *> m_active_nodes;
+ TrexStatelessDpPerPort m_ports[NUM_PORTS_PER_CORE];
/* pointer to the main object */
- CFlowGenListPerThread *m_core;
+ CFlowGenListPerThread * m_core;
double m_duration;
};
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index e4cf964d..1e53887b 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoh Haim
Cisco Systems, Inc.
*/
@@ -27,20 +28,51 @@ limitations under the License.
class TrexStatelessDpCore;
#include <trex_stream.h>
+class TrexStatelessCpToDpMsgBase;
+
+struct CGenNodeCommand : public CGenNodeBase {
+
+friend class TrexStatelessDpCore;
+
+public:
+ TrexStatelessCpToDpMsgBase * m_cmd;
+
+ uint8_t m_pad_end[104];
+
+public:
+ void free_command();
+
+} __rte_cache_aligned;;
+
+
+static_assert(sizeof(CGenNodeCommand) == sizeof(CGenNode), "sizeof(CGenNodeCommand) != sizeof(CGenNode)" );
+
/* this is a event for stateless */
struct CGenNodeStateless : public CGenNodeBase {
friend class TrexStatelessDpCore;
+public:
+ enum {
+ ss_FREE_RESUSE =1, /* should be free by scheduler */
+ ss_INACTIVE =2, /* will be active by other stream or stopped */
+ ss_ACTIVE =3 /* the stream is active */
+ };
+ typedef uint8_t stream_state_t ;
+
+ static std::string get_stream_state_str(stream_state_t stream_state);
+
private:
+ /* cache line 0 */
+ /* important stuff here */
void * m_cache_mbuf;
double m_next_time_offset; /* in sec */
double m_ibg_sec; /* inter burst time in sec */
- uint8_t m_is_stream_active;
+ stream_state_t m_state;
uint8_t m_port_id;
- uint8_t m_stream_type; /* TrexStream::STREAM_TYPE */
+ uint8_t m_stream_type; /* see TrexStream::STREAM_TYPE ,stream_type_t */
uint8_t m_pad;
uint32_t m_single_burst; /* the number of bursts in case of burst */
@@ -48,14 +80,40 @@ private:
uint32_t m_multi_bursts; /* in case of multi_burst how many bursts */
-
+ /* cache line 1 */
+ TrexStream * m_ref_stream_info; /* the stream info */
+ CGenNodeStateless * m_next_stream;
/* pad to match the size of CGenNode */
- uint8_t m_pad_end[65];
+ uint8_t m_pad_end[56];
+
+
public:
+ uint8_t get_port_id(){
+ return (m_port_id);
+ }
+
+
+ /* we restart the stream, schedule it using stream isg */
+ inline void update_refresh_time(double cur_time){
+ m_time = cur_time + usec_to_sec(m_ref_stream_info->m_isg_usec);
+ }
+
+ inline bool is_mask_for_free(){
+ return (get_state() == CGenNodeStateless::ss_FREE_RESUSE ?true:false);
+
+ }
+ inline void mark_for_free(){
+ set_state(CGenNodeStateless::ss_FREE_RESUSE);
+ /* only to be safe */
+ m_ref_stream_info= NULL;
+ m_next_stream= NULL;
+
+ }
+
inline uint8_t get_stream_type(){
return (m_stream_type);
}
@@ -72,11 +130,16 @@ public:
return (m_multi_bursts);
}
+ inline void set_state(stream_state_t new_state){
+ m_state=new_state;
+ }
- inline bool is_active() {
- return m_is_stream_active;
+
+ inline stream_state_t get_state() {
+ return m_state;
}
+ void refresh();
inline void handle_continues(CFlowGenListPerThread *thread) {
thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
@@ -100,14 +163,22 @@ public:
}else{
m_multi_bursts--;
if ( m_multi_bursts == 0 ) {
- /* stop */
- m_is_stream_active =0;
+ set_state(CGenNodeStateless::ss_INACTIVE);
+ if ( thread->set_stateless_next_node(this,m_next_stream) ){
+ /* update the next stream time using isg */
+ m_next_stream->update_refresh_time(m_time);
+
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)m_next_stream);
+ }else{
+ // in case of zero we will schedule a command to stop
+ // will be called from set_stateless_next_node
+ }
+
}else{
m_time += m_ibg_sec;
m_single_burst = m_single_burst_refill;
-
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
}
- thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
}
}
@@ -168,10 +239,14 @@ public:
void free_stl_node();
+public:
+ /* debug functions */
- void Dump(FILE *fd){
- fprintf(fd," %f, %lu, %lu \n",m_time,(ulong)m_port_id,(ulong)get_mbuf_cache_dir());
- }
+ int get_stream_id();
+
+ static void DumpHeader(FILE *fd);
+
+ void Dump(FILE *fd);
} __rte_cache_aligned;