summaryrefslogtreecommitdiffstats
path: root/src/stateless/cp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stateless/cp')
-rw-r--r--src/stateless/cp/trex_dp_port_events.cpp217
-rw-r--r--src/stateless/cp/trex_dp_port_events.h121
-rw-r--r--src/stateless/cp/trex_stateless.cpp2
-rw-r--r--src/stateless/cp/trex_stateless.h6
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp104
-rw-r--r--src/stateless/cp/trex_stateless_port.h16
-rw-r--r--src/stateless/cp/trex_stream.h2
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp3
8 files changed, 192 insertions, 279 deletions
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp
index ba327e59..8e098adf 100644
--- a/src/stateless/cp/trex_dp_port_events.cpp
+++ b/src/stateless/cp/trex_dp_port_events.cpp
@@ -20,6 +20,7 @@ limitations under the License.
*/
#include <trex_dp_port_events.h>
+#include <trex_stateless_messaging.h>
#include <sstream>
#include <os_time.h>
#include <trex_stateless.h>
@@ -27,24 +28,20 @@ limitations under the License.
/**
* port events
*/
-void
-TrexDpPortEvents::create(TrexStatelessPort *port) {
+TrexDpPortEvents::TrexDpPortEvents(TrexStatelessPort *port) {
m_port = port;
-
- for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) {
- m_events[i].create((TrexDpPortEvent::event_e) i, port);
- }
-
m_event_id_counter = EVENT_ID_INVALID;
}
-/**
- * generate a new event ID
- *
- */
-int
-TrexDpPortEvents::generate_event_id() {
- return (++m_event_id_counter);
+TrexDpPortEvent *
+TrexDpPortEvents::lookup(int event_id) {
+ auto search = m_events.find(event_id);
+
+ if (search != m_events.end()) {
+ return search->second;
+ } else {
+ return NULL;
+ }
}
/**
@@ -52,21 +49,49 @@ TrexDpPortEvents::generate_event_id() {
* all other events will be disabled
*
*/
-void
-TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) {
+int
+TrexDpPortEvents::create_event(TrexDpPortEvent *event, int timeout_ms) {
+ /* allocate ID for event */
+ int event_id = ++m_event_id_counter;
- /* first disable all events */
- for (TrexDpPortEvent & e : m_events) {
- e.disable();
- }
+ /* init and add */
+ event->init(m_port, event_id, timeout_ms);
+ m_events[event_id] = event;
- /* mark this event as allowed */
- m_events[ev].wait_for_event(event_id, timeout_ms);
+ return event_id;
}
void
-TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) {
- m_events[ev].disable();
+TrexDpPortEvents::destroy_event(int event_id) {
+ TrexDpPortEvent *event = lookup(event_id);
+ if (!event) {
+ /* cannot find event */
+ throw TrexException("internal error - cannot find event");
+ }
+
+ m_events.erase(event_id);
+ delete event;
+}
+
+class DPBarrier : public TrexDpPortEvent {
+protected:
+ virtual void on_event() {
+ /* do nothing */
+ }
+};
+
+void
+TrexDpPortEvents::barrier() {
+ int barrier_id = create_event(new DPBarrier());
+
+ TrexStatelessCpToDpMsgBase *barrier_msg = new TrexStatelessDpBarrier(m_port->m_port_id, barrier_id);
+ m_port->send_message_to_all_dp(barrier_msg);
+
+ get_stateless_obj()->get_platform_api()->flush_dp_messages();
+ while (lookup(barrier_id) != NULL) {
+ delay(1);
+ get_stateless_obj()->get_platform_api()->flush_dp_messages();
+ }
}
/**
@@ -74,39 +99,33 @@ TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) {
*
*/
void
-TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) {
- m_events[ev].handle_event(thread_id, event_id);
-}
-
-/***********
- * single event object
- *
- */
+TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id) {
+ TrexDpPortEvent *event = lookup(event_id);
+ /* event might have been deleted */
+ if (!event) {
+ return;
+ }
-void
-TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) {
- m_event_type = type;
- m_port = port;
+ bool done = event->on_core_reporting_in(thread_id);
- /* add the core ids to the hash */
- m_signal.clear();
- for (int core_id : m_port->get_core_id_list()) {
- m_signal[core_id] = false;
+ if (done) {
+ destroy_event(event_id);
}
-
- /* event is disabled */
- disable();
}
-/**
- * wait the event using event id and timeout
+/***************************
+ * event
*
- */
-void
-TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) {
+ **************************/
+TrexDpPortEvent::TrexDpPortEvent() {
+ m_port = NULL;
+ m_event_id = -1;
+}
- /* set a new event id */
+void
+TrexDpPortEvent::init(TrexStatelessPort *port, int event_id, int timeout_ms) {
+ m_port = port;
m_event_id = event_id;
/* do we have a timeout ? */
@@ -118,103 +137,33 @@ TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) {
/* prepare the signal array */
m_pending_cnt = 0;
- for (auto & core_pair : m_signal) {
- core_pair.second = false;
+ for (int core_id : m_port->get_core_id_list()) {
+ m_signal[core_id] = false;
m_pending_cnt++;
}
}
-void
-TrexDpPortEvent::disable() {
- m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
-}
-
-/**
- * get the event status
- *
- */
-
-TrexDpPortEvent::event_status_e
-TrexDpPortEvent::status() {
-
- /* is it even active ? */
- if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) {
- return (EVENT_DISABLE);
- }
-
- /* did it occured ? */
- if (m_pending_cnt == 0) {
- return (EVENT_OCCURED);
- }
-
- /* so we are enabled and the event did not occur - maybe we timed out ? */
- if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) {
- return (EVENT_TIMED_OUT);
- }
-
- /* so we are still waiting... */
- return (EVENT_PENDING);
-
-}
-
-void
-TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg) {
- std::stringstream err;
- err << "DP event '" << event_name(m_event_type) << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - ";
-}
-
-/**
- * event occured
- *
- */
-void
-TrexDpPortEvent::handle_event(int thread_id, int event_id) {
-
- /* if the event is disabled - we don't care */
- if (!is_active()) {
- return;
- }
-
- /* check the event id is matching the required event - if not maybe its an old signal */
- if (event_id != m_event_id) {
- return;
- }
-
+bool
+TrexDpPortEvent::on_core_reporting_in(int thread_id) {
/* mark sure no double signal */
if (m_signal.at(thread_id)) {
- err(thread_id, event_id, "double signal");
+ std::stringstream err;
+ err << "double signal detected on event id: " << m_event_id;
+ throw TrexException(err.str());
- } else {
- /* mark */
- m_signal.at(thread_id) = true;
- m_pending_cnt--;
}
+ /* mark */
+ m_signal.at(thread_id) = true;
+ m_pending_cnt--;
+
/* event occured */
if (m_pending_cnt == 0) {
- m_port->on_dp_event_occured(m_event_type);
- m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
+ on_event();
+ return true;
+ } else {
+ return false;
}
}
-bool
-TrexDpPortEvent::is_active() {
- return (status() != EVENT_DISABLE);
-}
-
-bool
-TrexDpPortEvent::has_timeout_expired() {
- return (status() == EVENT_TIMED_OUT);
-}
-
-const char *
-TrexDpPortEvent::event_name(event_e type) {
- switch (type) {
- case EVENT_STOP:
- return "DP STOP";
- default:
- throw TrexException("unknown event type");
- }
-
-}
diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h
index 557e590b..3b8c8633 100644
--- a/src/stateless/cp/trex_dp_port_events.h
+++ b/src/stateless/cp/trex_dp_port_events.h
@@ -25,95 +25,43 @@ limitations under the License.
#include <string>
class TrexStatelessPort;
+class TrexDpPortEvents;
/**
- * describes a single DP event related to port
+ * interface class for DP events
*
- * @author imarom (18-Nov-15)
+ * @author imarom (29-Feb-16)
*/
class TrexDpPortEvent {
-public:
-
- enum event_e {
- EVENT_STOP = 1,
- EVENT_MAX
- };
-
- /**
- * status of the event for the port
- */
- enum event_status_e {
- EVENT_DISABLE,
- EVENT_PENDING,
- EVENT_TIMED_OUT,
- EVENT_OCCURED
- };
-
- /**
- * init for the event
- *
- */
- void create(event_e type, TrexStatelessPort *port);
-
- /**
- * create a new pending event
- *
- */
- void wait_for_event(int event_id, int timeout_ms = -1);
-
- /**
- * mark event as not allowed to happen
- *
- */
- void disable();
-
- /**
- * get the event status
- *
- */
- event_status_e status();
-
- /**
- * event occured
- *
- */
- void handle_event(int thread_id, int event_id);
+ friend TrexDpPortEvents;
- /**
- * returns true if event is active
- *
- */
- bool is_active();
-
- /**
- * has timeout already expired ?
- *
- */
- bool has_timeout_expired();
-
- /**
- * generate error
- *
- */
- void err(int thread_id, int event_id, const std::string &err_msg);
+public:
+ TrexDpPortEvent();
+ virtual ~TrexDpPortEvent() {}
+protected:
/**
- * event to name
+ * what to do when an event has been completed (all cores
+ * reported in
*
+ * @author imarom (29-Feb-16)
*/
- static const char * event_name(event_e type);
+ virtual void on_event() = 0;
+ TrexStatelessPort *get_port() {
+ return m_port;
+ }
private:
+ void init(TrexStatelessPort *port, int event_id, int timeout_ms);
+ bool on_core_reporting_in(int thread_id);
- event_e m_event_type;
std::unordered_map<int, bool> m_signal;
int m_pending_cnt;
TrexStatelessPort *m_port;
int m_event_id;
int m_expire_limit_ms;
-
};
/**
@@ -124,44 +72,39 @@ class TrexDpPortEvents {
public:
friend class TrexDpPortEvent;
- void create(TrexStatelessPort *port);
+ static const int INVALID_ID = -1;
- /**
- * generate a new event ID to be used with wait_for_event
- *
- */
- int generate_event_id();
+ TrexDpPortEvents(TrexStatelessPort *port);
/**
* wait a new DP event on the port
* returns a key which will be used to identify
* the event happened
*
- * @author imarom (18-Nov-15)
- *
- * @param ev - type of event
- * @param event_id - a unique identifier for the event
- * @param timeout_ms - does it has a timeout ?
- *
*/
- void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1);
+ int create_event(TrexDpPortEvent *event, int timeout_ms = -1);
/**
- * disable an event (don't care)
+ * destroy an event
*
*/
- void disable(TrexDpPortEvent::event_e ev);
+ void destroy_event(int event_id);
/**
- * event has occured
- *
+ * return when all DP cores have responsed on a barrier
+ */
+ void barrier();
+
+ /**
+ * a core has reached the event
*/
- void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id);
+ void on_core_reporting_in(int event_id, int thread_id);
private:
- static const int EVENT_ID_INVALID = -1;
+ TrexDpPortEvent *lookup(int event_id);
- TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX];
+ static const int EVENT_ID_INVALID = -1;
+ std::unordered_map<int, TrexDpPortEvent *> m_events;
int m_event_id_counter;
TrexStatelessPort *m_port;
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index a4522837..9e24802b 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -40,7 +40,7 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) {
/* create RPC servers */
/* set both servers to mutex each other */
- m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &m_global_cp_lock);
+ m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_global_lock);
m_rpc_server->set_verbose(cfg.m_rpc_server_verbose);
/* configure ports */
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index cc47da6b..6e5e0c44 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -92,18 +92,19 @@ public:
TrexStatelessCfg() {
m_port_count = 0;
m_rpc_req_resp_cfg = NULL;
- m_rpc_async_cfg = NULL;
m_rpc_server_verbose = false;
m_platform_api = NULL;
m_publisher = NULL;
+ m_global_lock = NULL;
}
const TrexRpcServerConfig *m_rpc_req_resp_cfg;
- const TrexRpcServerConfig *m_rpc_async_cfg;
const TrexPlatformApi *m_platform_api;
bool m_rpc_server_verbose;
uint8_t m_port_count;
TrexPublisher *m_publisher;
+ std::mutex *m_global_lock;
+
};
/**
@@ -186,7 +187,6 @@ protected:
TrexPublisher *m_publisher;
- std::mutex m_global_cp_lock;
};
/**
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index c60b0e85..7302e05d 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -48,11 +48,35 @@ port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &core
using namespace std;
+
+
+/***************************
+ * trex DP events handlers
+ *
+ **************************/
+class AsyncStopEvent : public TrexDpPortEvent {
+
+protected:
+ /**
+ * when an async event occurs (all cores have reported in)
+ *
+ * @author imarom (29-Feb-16)
+ */
+ virtual void on_event() {
+ get_port()->change_state(TrexStatelessPort::PORT_STATE_STREAMS);
+
+ get_port()->common_port_stop_actions(true);
+
+ assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
+ get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+ }
+};
+
/***************************
* trex stateless port
*
**************************/
-TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) {
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) {
std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
m_port_id = port_id;
@@ -73,16 +97,20 @@ TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api
m_cores_id_list.push_back(core_pair.first);
}
- /* init the events DP DB */
- m_dp_events.create(this);
-
m_graph_obj = NULL;
+
+ m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
}
TrexStatelessPort::~TrexStatelessPort() {
if (m_graph_obj) {
delete m_graph_obj;
}
+
+ if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
+ m_dp_events.destroy_event(m_pending_async_stop_event);
+ m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+ }
}
/**
@@ -170,16 +198,14 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
}
/* generate a message to all the relevant DP cores to start transmitting */
- int event_id = m_dp_events.generate_event_id();
-
- /* mark that DP event of stoppped is possible */
- m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id);
-
+ assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
+ m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
/* update object status */
m_factor = factor;
m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues();
m_last_duration = duration;
+
change_state(PORT_STATE_TX);
/* update the DP - messages will be freed by the DP */
@@ -188,17 +214,23 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
/* was the core assigned a compiled object ? */
if (compiled_objs[index]) {
- TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration);
+ TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id,
+ m_pending_async_stop_event,
+ compiled_objs[index],
+ duration);
send_message_to_dp(core_id, start_msg);
} else {
/* mimic an end event */
- m_dp_events.handle_event(TrexDpPortEvent::EVENT_STOP, core_id, event_id);
+ m_dp_events.on_core_reporting_in(m_pending_async_stop_event, core_id);
}
index++;
}
+ /* for debug - this can be turn on */
+ //m_dp_events.barrier();
+
/* update subscribers */
Json::Value data;
data["port_id"] = m_port_id;
@@ -225,17 +257,23 @@ TrexStatelessPort::stop_traffic(void) {
/* delete any previous graphs */
delete_streams_graph();
- /* mask out the DP stop event */
- m_dp_events.disable(TrexDpPortEvent::EVENT_STOP);
+ /* to avoid race, first destroy any previous stop/pause events */
+ if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
+ m_dp_events.destroy_event(m_pending_async_stop_event);
+ m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+ }
+
/* generate a message to all the relevant DP cores to start transmitting */
TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
-
send_message_to_all_dp(stop_msg);
- /* continue to general actions */
+ /* a barrier - make sure all the DP cores stopped */
+ m_dp_events.barrier();
+
+ change_state(PORT_STATE_STREAMS);
+
common_port_stop_actions(false);
-
}
/**
@@ -243,14 +281,12 @@ TrexStatelessPort::stop_traffic(void) {
*
*/
void
-TrexStatelessPort::common_port_stop_actions(bool event_triggered) {
+TrexStatelessPort::common_port_stop_actions(bool async) {
- change_state(PORT_STATE_STREAMS);
-
Json::Value data;
data["port_id"] = m_port_id;
- if (event_triggered) {
+ if (async) {
get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data);
} else {
get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
@@ -274,10 +310,14 @@ TrexStatelessPort::pause_traffic(void) {
throw TrexException(" pause is supported when duration is not enable is start command ");
}
+ /* send a pause message */
TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id);
-
send_message_to_all_dp(pause_msg);
+ /* make sure all DP cores paused */
+ m_dp_events.barrier();
+
+ /* change state */
change_state(PORT_STATE_PAUSE);
Json::Value data;
@@ -285,6 +325,7 @@ TrexStatelessPort::pause_traffic(void) {
get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data);
}
+
void
TrexStatelessPort::resume_traffic(void) {
@@ -294,7 +335,6 @@ TrexStatelessPort::resume_traffic(void) {
TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id);
send_message_to_all_dp(resume_msg);
-
change_state(PORT_STATE_TX);
@@ -335,7 +375,6 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
}
TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
-
send_message_to_all_dp(update_msg);
m_factor *= factor;
@@ -439,25 +478,6 @@ TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBas
ring->Enqueue((CGenNode *)msg);
}
-/**
- * when a DP (async) event occurs - handle it
- *
- */
-void
-TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
- Json::Value data;
-
- switch (event_type) {
-
- case TrexDpPortEvent::EVENT_STOP:
- common_port_stop_actions(true);
- break;
-
- default:
- assert(0);
-
- }
-}
uint64_t
TrexStatelessPort::get_port_speed_bps() const {
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 192d0d19..d3c4dcb9 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -102,13 +102,17 @@ private:
};
+class AsyncStopEvent;
+
/**
* describes a stateless port
*
* @author imarom (31-Aug-15)
*/
class TrexStatelessPort {
- friend class TrexDpPortEvent;
+ friend TrexDpPortEvents;
+ friend TrexDpPortEvent;
+ friend AsyncStopEvent;
public:
@@ -363,18 +367,12 @@ private:
*/
void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg);
- /**
- * triggered when event occurs
- *
- */
- void on_dp_event_occured(TrexDpPortEvent::event_e event_type);
-
/**
* when a port stops, perform various actions
*
*/
- void common_port_stop_actions(bool event_triggered);
+ void common_port_stop_actions(bool async);
/**
* calculate effective M per core
@@ -421,6 +419,8 @@ private:
/* owner information */
TrexPortOwner m_owner;
+
+ int m_pending_async_stop_event;
};
diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h
index 161e9592..1abf0c04 100644
--- a/src/stateless/cp/trex_stream.h
+++ b/src/stateless/cp/trex_stream.h
@@ -514,7 +514,7 @@ public:
bool m_seq_enabled;
bool m_latency;
uint32_t m_user_id;
-
+ uint16_t m_hw_id;
} m_rx_check;
uint32_t m_burst_total_pkts; /* valid in case of burst stSINGLE_BURST,stMULTI_BURST*/
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
index 7c91754c..be5002da 100644
--- a/src/stateless/cp/trex_streams_compiler.cpp
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -476,7 +476,8 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream,
}
TrexStream *fixed_rx_flow_stat_stream = stream->clone(true);
- get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream);
+
+ get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id); //???? check for errors
/* can this stream be split to many cores ? */
if (!stream->is_splitable(dp_core_count)) {