summaryrefslogtreecommitdiffstats
path: root/src/stateless
diff options
context:
space:
mode:
authorYaroslav Brustinov <ybrustin@cisco.com>2015-12-13 17:18:02 +0200
committerYaroslav Brustinov <ybrustin@cisco.com>2015-12-13 17:18:02 +0200
commit9738e267d806223ee25e013b5959ccac26c1a14a (patch)
tree590c8f329f2ab68c7da3f1f8f4c55f81243a08bc /src/stateless
parenta573adc6395c9ad8d96978508a07a654ef48c7a9 (diff)
parent301341ddb1bf17387d7fea19667bedd40fce4509 (diff)
Merge branch 'master' into get_logs_and_version
Diffstat (limited to 'src/stateless')
-rw-r--r--src/stateless/cp/trex_dp_port_events.cpp220
-rw-r--r--src/stateless/cp/trex_dp_port_events.h171
-rw-r--r--src/stateless/cp/trex_stateless.cpp164
-rw-r--r--src/stateless/cp/trex_stateless.h83
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp607
-rw-r--r--src/stateless/cp/trex_stateless_port.h396
-rw-r--r--src/stateless/cp/trex_stream.cpp93
-rw-r--r--src/stateless/cp/trex_stream.h199
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp746
-rw-r--r--src/stateless/cp/trex_streams_compiler.h229
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp689
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h260
-rw-r--r--src/stateless/dp/trex_stream_node.h284
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp191
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h319
15 files changed, 4130 insertions, 521 deletions
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp
new file mode 100644
index 00000000..ba327e59
--- /dev/null
+++ b/src/stateless/cp/trex_dp_port_events.cpp
@@ -0,0 +1,220 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include <trex_dp_port_events.h>
+#include <sstream>
+#include <os_time.h>
+#include <trex_stateless.h>
+
+/**
+ * port events
+ */
+void
+TrexDpPortEvents::create(TrexStatelessPort *port) {
+ m_port = port;
+
+ for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) {
+ m_events[i].create((TrexDpPortEvent::event_e) i, port);
+ }
+
+ m_event_id_counter = EVENT_ID_INVALID;
+}
+
+/**
+ * generate a new event ID
+ *
+ */
+int
+TrexDpPortEvents::generate_event_id() {
+ return (++m_event_id_counter);
+}
+
+/**
+ * mark the next allowed event
+ * all other events will be disabled
+ *
+ */
+void
+TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) {
+
+ /* first disable all events */
+ for (TrexDpPortEvent & e : m_events) {
+ e.disable();
+ }
+
+ /* mark this event as allowed */
+ m_events[ev].wait_for_event(event_id, timeout_ms);
+}
+
+void
+TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) {
+ m_events[ev].disable();
+}
+
+/**
+ * handle an event
+ *
+ */
+void
+TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) {
+ m_events[ev].handle_event(thread_id, event_id);
+}
+
+/***********
+ * single event object
+ *
+ */
+
+void
+TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) {
+ m_event_type = type;
+ m_port = port;
+
+ /* add the core ids to the hash */
+ m_signal.clear();
+ for (int core_id : m_port->get_core_id_list()) {
+ m_signal[core_id] = false;
+ }
+
+ /* event is disabled */
+ disable();
+}
+
+
+/**
+ * wait the event using event id and timeout
+ *
+ */
+void
+TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) {
+
+ /* set a new event id */
+ m_event_id = event_id;
+
+ /* do we have a timeout ? */
+ if (timeout_ms > 0) {
+ m_expire_limit_ms = os_get_time_msec() + timeout_ms;
+ } else {
+ m_expire_limit_ms = -1;
+ }
+
+ /* prepare the signal array */
+ m_pending_cnt = 0;
+ for (auto & core_pair : m_signal) {
+ core_pair.second = false;
+ m_pending_cnt++;
+ }
+}
+
+void
+TrexDpPortEvent::disable() {
+ m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
+}
+
+/**
+ * get the event status
+ *
+ */
+
+TrexDpPortEvent::event_status_e
+TrexDpPortEvent::status() {
+
+ /* is it even active ? */
+ if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) {
+ return (EVENT_DISABLE);
+ }
+
+ /* did it occured ? */
+ if (m_pending_cnt == 0) {
+ return (EVENT_OCCURED);
+ }
+
+ /* so we are enabled and the event did not occur - maybe we timed out ? */
+ if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) {
+ return (EVENT_TIMED_OUT);
+ }
+
+ /* so we are still waiting... */
+ return (EVENT_PENDING);
+
+}
+
+void
+TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg) {
+ std::stringstream err;
+ err << "DP event '" << event_name(m_event_type) << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - ";
+}
+
+/**
+ * event occured
+ *
+ */
+void
+TrexDpPortEvent::handle_event(int thread_id, int event_id) {
+
+ /* if the event is disabled - we don't care */
+ if (!is_active()) {
+ return;
+ }
+
+ /* check the event id is matching the required event - if not maybe its an old signal */
+ if (event_id != m_event_id) {
+ return;
+ }
+
+ /* mark sure no double signal */
+ if (m_signal.at(thread_id)) {
+ err(thread_id, event_id, "double signal");
+
+ } else {
+ /* mark */
+ m_signal.at(thread_id) = true;
+ m_pending_cnt--;
+ }
+
+ /* event occured */
+ if (m_pending_cnt == 0) {
+ m_port->on_dp_event_occured(m_event_type);
+ m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
+ }
+}
+
+bool
+TrexDpPortEvent::is_active() {
+ return (status() != EVENT_DISABLE);
+}
+
+bool
+TrexDpPortEvent::has_timeout_expired() {
+ return (status() == EVENT_TIMED_OUT);
+}
+
+const char *
+TrexDpPortEvent::event_name(event_e type) {
+ switch (type) {
+ case EVENT_STOP:
+ return "DP STOP";
+
+ default:
+ throw TrexException("unknown event type");
+ }
+
+}
diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h
new file mode 100644
index 00000000..557e590b
--- /dev/null
+++ b/src/stateless/cp/trex_dp_port_events.h
@@ -0,0 +1,171 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_DP_PORT_EVENTS_H__
+#define __TREX_DP_PORT_EVENTS_H__
+
+#include <unordered_map>
+#include <string>
+
+class TrexStatelessPort;
+
+/**
+ * describes a single DP event related to port
+ *
+ * @author imarom (18-Nov-15)
+ */
+class TrexDpPortEvent {
+public:
+
+ enum event_e {
+ EVENT_STOP = 1,
+ EVENT_MAX
+ };
+
+ /**
+ * status of the event for the port
+ */
+ enum event_status_e {
+ EVENT_DISABLE,
+ EVENT_PENDING,
+ EVENT_TIMED_OUT,
+ EVENT_OCCURED
+ };
+
+ /**
+ * init for the event
+ *
+ */
+ void create(event_e type, TrexStatelessPort *port);
+
+ /**
+ * create a new pending event
+ *
+ */
+ void wait_for_event(int event_id, int timeout_ms = -1);
+
+ /**
+ * mark event as not allowed to happen
+ *
+ */
+ void disable();
+
+ /**
+ * get the event status
+ *
+ */
+ event_status_e status();
+
+ /**
+ * event occured
+ *
+ */
+ void handle_event(int thread_id, int event_id);
+
+ /**
+ * returns true if event is active
+ *
+ */
+ bool is_active();
+
+ /**
+ * has timeout already expired ?
+ *
+ */
+ bool has_timeout_expired();
+
+ /**
+ * generate error
+ *
+ */
+ void err(int thread_id, int event_id, const std::string &err_msg);
+
+ /**
+ * event to name
+ *
+ */
+ static const char * event_name(event_e type);
+
+
+private:
+
+ event_e m_event_type;
+ std::unordered_map<int, bool> m_signal;
+ int m_pending_cnt;
+
+ TrexStatelessPort *m_port;
+ int m_event_id;
+ int m_expire_limit_ms;
+
+};
+
+/**
+ * all the events related to a port
+ *
+ */
+class TrexDpPortEvents {
+public:
+ friend class TrexDpPortEvent;
+
+ void create(TrexStatelessPort *port);
+
+ /**
+ * generate a new event ID to be used with wait_for_event
+ *
+ */
+ int generate_event_id();
+
+ /**
+ * wait a new DP event on the port
+ * returns a key which will be used to identify
+ * the event happened
+ *
+ * @author imarom (18-Nov-15)
+ *
+ * @param ev - type of event
+ * @param event_id - a unique identifier for the event
+ * @param timeout_ms - does it has a timeout ?
+ *
+ */
+ void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1);
+
+ /**
+ * disable an event (don't care)
+ *
+ */
+ void disable(TrexDpPortEvent::event_e ev);
+
+ /**
+ * event has occured
+ *
+ */
+ void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id);
+
+private:
+ static const int EVENT_ID_INVALID = -1;
+
+ TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX];
+ int m_event_id_counter;
+
+ TrexStatelessPort *m_port;
+
+};
+
+#endif /* __TREX_DP_PORT_EVENTS_H__ */
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index 72762e26..a4522837 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -31,55 +31,60 @@ using namespace std;
* Trex stateless object
*
**********************************************************/
-TrexStateless::TrexStateless() {
- m_is_configured = false;
-}
-
/**
- * configure the singleton stateless object
*
*/
-void TrexStateless::configure(const TrexStatelessCfg &cfg) {
-
- TrexStateless& instance = get_instance_internal();
-
- /* check status */
- if (instance.m_is_configured) {
- throw TrexException("re-configuration of stateless object is not allowed");
- }
+TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) {
/* create RPC servers */
/* set both servers to mutex each other */
- instance.m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &instance.m_global_cp_lock);
- instance.m_rpc_server->set_verbose(cfg.m_rpc_server_verbose);
+ m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &m_global_cp_lock);
+ m_rpc_server->set_verbose(cfg.m_rpc_server_verbose);
/* configure ports */
+ m_port_count = cfg.m_port_count;
- instance.m_port_count = cfg.m_port_count;
-
- for (int i = 0; i < instance.m_port_count; i++) {
- instance.m_ports.push_back(new TrexStatelessPort(i));
+ for (int i = 0; i < m_port_count; i++) {
+ m_ports.push_back(new TrexStatelessPort(i, cfg.m_platform_api));
}
- /* cores */
- instance.m_dp_core_count = cfg.m_dp_core_count;
- for (int i = 0; i < instance.m_dp_core_count; i++) {
- instance.m_dp_cores.push_back(new TrexStatelessDpCore(i));
+ m_platform_api = cfg.m_platform_api;
+ m_publisher = cfg.m_publisher;
+
+}
+
+/**
+ * release all memory
+ *
+ * @author imarom (08-Oct-15)
+ */
+TrexStateless::~TrexStateless() {
+
+ /* release memory for ports */
+ for (auto port : m_ports) {
+ delete port;
}
+ m_ports.clear();
- /* done */
- instance.m_is_configured = true;
+ /* stops the RPC server */
+ m_rpc_server->stop();
+ delete m_rpc_server;
+
+ m_rpc_server = NULL;
+
+ delete m_platform_api;
+ m_platform_api = NULL;
}
+
/**
* starts the control plane side
*
*/
void
TrexStateless::launch_control_plane() {
- //std::cout << "\n on control/master core \n";
/* pin this process to the current running CPU
any new thread will be called on the same CPU
@@ -94,39 +99,6 @@ TrexStateless::launch_control_plane() {
m_rpc_server->start();
}
-void
-TrexStateless::launch_on_dp_core(uint8_t core_id) {
- m_dp_cores[core_id - 1]->run();
-}
-
-/**
- * destroy the singleton and release all memory
- *
- * @author imarom (08-Oct-15)
- */
-void
-TrexStateless::destroy() {
- TrexStateless& instance = get_instance_internal();
-
- if (!instance.m_is_configured) {
- return;
- }
-
- /* release memory for ports */
- for (auto port : instance.m_ports) {
- delete port;
- }
- instance.m_ports.clear();
-
- /* stops the RPC server */
- instance.m_rpc_server->stop();
- delete instance.m_rpc_server;
-
- instance.m_rpc_server = NULL;
-
- /* done */
- instance.m_is_configured = false;
-}
/**
* fetch a port by ID
@@ -148,57 +120,32 @@ TrexStateless::get_port_count() {
uint8_t
TrexStateless::get_dp_core_count() {
- return m_dp_core_count;
-}
-
-void
-TrexStateless::update_stats() {
-
- /* update CPU util.
- TODO
- */
- m_stats.m_stats.m_cpu_util = 0;
-
- /* for every port update and accumulate */
- for (uint8_t i = 0; i < m_port_count; i++) {
- m_ports[i]->update_stats();
-
- const TrexPortStats & port_stats = m_ports[i]->get_stats();
-
- m_stats.m_stats.m_tx_bps += port_stats.m_stats.m_tx_bps;
- m_stats.m_stats.m_rx_bps += port_stats.m_stats.m_rx_bps;
-
- m_stats.m_stats.m_tx_pps += port_stats.m_stats.m_tx_pps;
- m_stats.m_stats.m_rx_pps += port_stats.m_stats.m_rx_pps;
-
- m_stats.m_stats.m_total_tx_pkts += port_stats.m_stats.m_total_tx_pkts;
- m_stats.m_stats.m_total_rx_pkts += port_stats.m_stats.m_total_rx_pkts;
-
- m_stats.m_stats.m_total_tx_bytes += port_stats.m_stats.m_total_tx_bytes;
- m_stats.m_stats.m_total_rx_bytes += port_stats.m_stats.m_total_rx_bytes;
-
- m_stats.m_stats.m_tx_rx_errors += port_stats.m_stats.m_tx_rx_errors;
- }
+ return m_platform_api->get_dp_core_count();
}
void
TrexStateless::encode_stats(Json::Value &global) {
- global["cpu_util"] = m_stats.m_stats.m_cpu_util;
+ const TrexPlatformApi *api = get_stateless_obj()->get_platform_api();
+
+ TrexPlatformGlobalStats stats;
+ api->get_global_stats(stats);
- global["tx_bps"] = m_stats.m_stats.m_tx_bps;
- global["rx_bps"] = m_stats.m_stats.m_rx_bps;
+ global["cpu_util"] = stats.m_stats.m_cpu_util;
- global["tx_pps"] = m_stats.m_stats.m_tx_pps;
- global["rx_pps"] = m_stats.m_stats.m_rx_pps;
+ global["tx_bps"] = stats.m_stats.m_tx_bps;
+ global["rx_bps"] = stats.m_stats.m_rx_bps;
- global["total_tx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_pkts);
- global["total_rx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_pkts);
+ global["tx_pps"] = stats.m_stats.m_tx_pps;
+ global["rx_pps"] = stats.m_stats.m_rx_pps;
- global["total_tx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_bytes);
- global["total_rx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_bytes);
+ global["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts);
+ global["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts);
- global["tx_rx_errors"] = Json::Value::UInt64(m_stats.m_stats.m_tx_rx_errors);
+ global["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes);
+ global["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes);
+
+ global["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors);
for (uint8_t i = 0; i < m_port_count; i++) {
std::stringstream ss;
@@ -210,3 +157,20 @@ TrexStateless::encode_stats(Json::Value &global) {
}
}
+/**
+ * generate a snapshot for publish (async publish)
+ *
+ */
+void
+TrexStateless::generate_publish_snapshot(std::string &snapshot) {
+ Json::FastWriter writer;
+ Json::Value root;
+
+ root["name"] = "trex-stateless-info";
+ root["type"] = 0;
+
+ /* stateless specific info goes here */
+ root["data"] = Json::nullValue;
+
+ snapshot = writer.write(root);
+}
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index 649b25dd..5c11be1e 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -29,8 +29,10 @@ limitations under the License.
#include <trex_stream.h>
#include <trex_stateless_port.h>
-#include <trex_stateless_dp_core.h>
#include <trex_rpc_server_api.h>
+#include <publisher/trex_publisher.h>
+
+#include <internal_api/trex_platform_api.h>
/**
* generic exception for errors
@@ -88,17 +90,19 @@ public:
/* default values */
TrexStatelessCfg() {
m_port_count = 0;
- m_dp_core_count = 0;
m_rpc_req_resp_cfg = NULL;
m_rpc_async_cfg = NULL;
- m_rpc_server_verbose = false;
+ m_rpc_server_verbose = false;
+ m_platform_api = NULL;
+ m_publisher = NULL;
}
const TrexRpcServerConfig *m_rpc_req_resp_cfg;
const TrexRpcServerConfig *m_rpc_async_cfg;
+ const TrexPlatformApi *m_platform_api;
bool m_rpc_server_verbose;
uint8_t m_port_count;
- uint8_t m_dp_core_count;
+ TrexPublisher *m_publisher;
};
/**
@@ -113,27 +117,8 @@ public:
* reconfiguration is not allowed
* an exception will be thrown
*/
- static void configure(const TrexStatelessCfg &cfg);
-
- /**
- * destroy the instance
- *
- */
- static void destroy();
-
- /**
- * singleton public get instance
- *
- */
- static TrexStateless& get_instance() {
- TrexStateless& instance = get_instance_internal();
-
- if (!instance.m_is_configured) {
- throw TrexException("object is not configured");
- }
-
- return instance;
- }
+ TrexStateless(const TrexStatelessCfg &cfg);
+ ~TrexStateless();
/**
* starts the control plane side
@@ -152,12 +137,6 @@ public:
uint8_t get_dp_core_count();
- /**
- * update all the stats (deep update)
- * (include all the ports and global stats)
- *
- */
- void update_stats();
/**
* fetch all the stats
@@ -165,22 +144,29 @@ public:
*/
void encode_stats(Json::Value &global);
+ /**
+ * generate a snapshot for publish
+ */
+ void generate_publish_snapshot(std::string &snapshot);
-protected:
- TrexStateless();
+ const TrexPlatformApi * get_platform_api() {
+ return (m_platform_api);
+ }
- static TrexStateless& get_instance_internal () {
- static TrexStateless instance;
- return instance;
+ TrexPublisher * get_publisher() {
+ return m_publisher;
}
- /* c++ 2011 style singleton */
+ const std::vector <TrexStatelessPort *> get_port_list() {
+ return m_ports;
+ }
+
+protected:
+
+ /* no copy or assignment */
TrexStateless(TrexStateless const&) = delete;
void operator=(TrexStateless const&) = delete;
- /* status */
- bool m_is_configured;
-
/* RPC server array */
TrexRpcServer *m_rpc_server;
@@ -188,15 +174,22 @@ protected:
std::vector <TrexStatelessPort *> m_ports;
uint8_t m_port_count;
- /* cores */
- std::vector <TrexStatelessDpCore *> m_dp_cores;
- uint8_t m_dp_core_count;
+ /* platform API */
+ const TrexPlatformApi *m_platform_api;
- /* stats */
- TrexStatelessStats m_stats;
+ TrexPublisher *m_publisher;
std::mutex m_global_cp_lock;
};
+/**
+ * an anchor function
+ *
+ * @author imarom (25-Oct-15)
+ *
+ * @return TrexStateless&
+ */
+TrexStateless * get_stateless_obj();
+
#endif /* __TREX_STATELESS_H__ */
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index a31847a5..9770c735 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -18,211 +18,600 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
+
#include <trex_stateless.h>
#include <trex_stateless_port.h>
+#include <trex_stateless_messaging.h>
+#include <trex_streams_compiler.h>
+
#include <string>
#ifndef TREX_RPC_MOCK_SERVER
+
// DPDK c++ issue
-#define UINT8_MAX 255
-#define UINT16_MAX 0xFFFF
+#ifndef UINT8_MAX
+ #define UINT8_MAX 255
+#endif
+
+#ifndef UINT16_MAX
+ #define UINT16_MAX 0xFFFF
+#endif
+
// DPDK c++ issue
#endif
#include <rte_ethdev.h>
#include <os_time.h>
+void
+port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list);
+
using namespace std;
/***************************
* trex stateless port
*
**************************/
-TrexStatelessPort::TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) {
- m_port_state = PORT_STATE_UP_IDLE;
- clear_owner();
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) {
+ std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
+
+ m_port_id = port_id;
+ m_port_state = PORT_STATE_IDLE;
+
+ /* get the platform specific data */
+ api->get_interface_info(port_id, m_driver_name, m_speed);
+
+ /* get the DP cores belonging to this port */
+ api->port_id_to_cores(m_port_id, core_pair_list);
+
+ for (auto core_pair : core_pair_list) {
+
+ /* send the core id */
+ m_cores_id_list.push_back(core_pair.first);
+ }
+
+ /* init the events DP DB */
+ m_dp_events.create(this);
}
/**
- * starts the traffic on the port
+ * acquire the port
*
+ * @author imarom (09-Nov-15)
+ *
+ * @param user
+ * @param force
*/
-TrexStatelessPort::rc_e
-TrexStatelessPort::start_traffic(void) {
+void
+TrexStatelessPort::acquire(const std::string &user, bool force) {
- if (m_port_state != PORT_STATE_UP_IDLE) {
- return (RC_ERR_BAD_STATE_FOR_OP);
+ /* if port is free - just take it */
+ if (get_owner().is_free()) {
+ get_owner().own(user);
+ return;
}
- if (get_stream_table()->size() == 0) {
- return (RC_ERR_NO_STREAMS);
+ if (force) {
+ get_owner().own(user);
+
+ /* inform the other client of the steal... */
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FORCE_ACQUIRED, data);
+
+ } else {
+ /* not same user or session id and not force - report error */
+ if (get_owner().get_name() == user) {
+ throw TrexRpcException("port is already owned by another session of '" + user + "'");
+ } else {
+ throw TrexRpcException("port is already taken by '" + get_owner().get_name() + "'");
+ }
}
- m_port_state = PORT_STATE_TRANSMITTING;
+}
- /* real code goes here */
- return (RC_OK);
+void
+TrexStatelessPort::release(void) {
+ get_owner().release();
}
-void
-TrexStatelessPort::stop_traffic(void) {
+/**
+ * starts the traffic on the port
+ *
+ */
+void
+TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration) {
+
+ /* command allowed only on state stream */
+ verify_state(PORT_STATE_STREAMS);
+
+ /* just making sure no leftovers... */
+ delete_streams_graph();
+
+ /* on start - we can only provide absolute values */
+ assert(mul.m_op == TrexPortMultiplier::OP_ABS);
+
+ double factor = calculate_effective_factor(mul);
+
+ /* fetch all the streams from the table */
+ vector<TrexStream *> streams;
+ get_object_list(streams);
- /* real code goes here */
- if (m_port_state == PORT_STATE_TRANSMITTING) {
- m_port_state = PORT_STATE_UP_IDLE;
+
+ /* compiler it */
+ std::vector<TrexStreamsCompiledObj *> compiled_objs;
+ std::string fail_msg;
+
+ TrexStreamsCompiler compiler;
+ bool rc = compiler.compile(m_port_id,
+ streams,
+ compiled_objs,
+ get_dp_core_count(),
+ factor,
+ &fail_msg);
+ if (!rc) {
+ throw TrexRpcException(fail_msg);
}
+
+ /* generate a message to all the relevant DP cores to start transmitting */
+ int event_id = m_dp_events.generate_event_id();
+
+ /* mark that DP event of stoppped is possible */
+ m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id);
+
+
+ /* update object status */
+ m_factor = factor;
+ m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues();
+ m_last_duration = duration;
+ change_state(PORT_STATE_TX);
+
+
+ /* update the DP - messages will be freed by the DP */
+ int index = 0;
+ for (auto core_id : m_cores_id_list) {
+
+ TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration);
+ send_message_to_dp(core_id, start_msg);
+
+ index++;
+ }
+
+ /* update subscribers */
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
+
}
+
/**
-* access the stream table
-*
-*/
-TrexStreamTable * TrexStatelessPort::get_stream_table() {
- return &m_stream_table;
+ * stop traffic on port
+ *
+ * @author imarom (09-Nov-15)
+ *
+ * @return TrexStatelessPort::rc_e
+ */
+void
+TrexStatelessPort::stop_traffic(void) {
+
+ if (!( (m_port_state == PORT_STATE_TX)
+ || (m_port_state == PORT_STATE_PAUSE) )) {
+ return;
+ }
+
+ /* delete any previous graphs */
+ delete_streams_graph();
+
+ /* mask out the DP stop event */
+ m_dp_events.disable(TrexDpPortEvent::EVENT_STOP);
+
+ /* generate a message to all the relevant DP cores to start transmitting */
+ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
+
+ send_message_to_all_dp(stop_msg);
+
+ change_state(PORT_STATE_STREAMS);
+
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
+
+}
+
+void
+TrexStatelessPort::pause_traffic(void) {
+
+ verify_state(PORT_STATE_TX);
+
+ if (m_last_all_streams_continues == false) {
+ throw TrexRpcException(" pause is supported when all streams are in continues mode ");
+ }
+
+ if ( m_last_duration>0.0 ) {
+ throw TrexRpcException(" pause is supported when duration is not enable is start command ");
+ }
+
+ TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id);
+
+ send_message_to_all_dp(pause_msg);
+
+ change_state(PORT_STATE_PAUSE);
+
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data);
+}
+
+void
+TrexStatelessPort::resume_traffic(void) {
+
+ verify_state(PORT_STATE_PAUSE);
+
+ /* generate a message to all the relevant DP cores to start transmitting */
+ TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id);
+
+ send_message_to_all_dp(resume_msg);
+
+ change_state(PORT_STATE_TX);
+
+
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_RESUMED, data);
}
+void
+TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul) {
+
+ double factor;
+
+ verify_state(PORT_STATE_TX | PORT_STATE_PAUSE);
+
+ /* generate a message to all the relevant DP cores to start transmitting */
+ double new_factor = calculate_effective_factor(mul);
+
+ switch (mul.m_op) {
+ case TrexPortMultiplier::OP_ABS:
+ factor = new_factor / m_factor;
+ break;
+
+ case TrexPortMultiplier::OP_ADD:
+ factor = (m_factor + new_factor) / m_factor;
+ break;
+
+ case TrexPortMultiplier::OP_SUB:
+ factor = (m_factor - new_factor) / m_factor;
+ if (factor <= 0) {
+ throw TrexRpcException("Update request will lower traffic to less than zero");
+ }
+ break;
+
+ default:
+ assert(0);
+ break;
+ }
+
+ TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
+
+ send_message_to_all_dp(update_msg);
+
+ m_factor *= factor;
+
+}
std::string
-TrexStatelessPort::get_state_as_string() {
+TrexStatelessPort::get_state_as_string() const {
switch (get_state()) {
case PORT_STATE_DOWN:
- return "down";
+ return "DOWN";
+
+ case PORT_STATE_IDLE:
+ return "IDLE";
- case PORT_STATE_UP_IDLE:
- return "idle";
+ case PORT_STATE_STREAMS:
+ return "STREAMS";
- case PORT_STATE_TRANSMITTING:
- return "transmitting";
+ case PORT_STATE_TX:
+ return "TX";
+
+ case PORT_STATE_PAUSE:
+ return "PAUSE";
}
- return "unknown";
+ return "UNKNOWN";
}
void
-TrexStatelessPort::get_properties(string &driver, string &speed) {
+TrexStatelessPort::get_properties(std::string &driver, TrexPlatformApi::driver_speed_e &speed) {
- /* take this from DPDK */
- driver = "e1000";
- speed = "1 Gbps";
+ driver = m_driver_name;
+ speed = m_speed;
}
+bool
+TrexStatelessPort::verify_state(int state, bool should_throw) const {
+ if ( (state & m_port_state) == 0 ) {
+ if (should_throw) {
+ throw TrexRpcException("command cannot be executed on current state: '" + get_state_as_string() + "'");
+ } else {
+ return false;
+ }
+ }
-/**
- * generate a random connection handler
- *
- */
-std::string
-TrexStatelessPort::generate_handler() {
- std::stringstream ss;
+ return true;
+}
- static const char alphanum[] =
- "0123456789"
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "abcdefghijklmnopqrstuvwxyz";
+void
+TrexStatelessPort::change_state(port_state_e new_state) {
- /* generate 8 bytes of random handler */
- for (int i = 0; i < 8; ++i) {
- ss << alphanum[rand() % (sizeof(alphanum) - 1)];
+ m_port_state = new_state;
+}
+
+
+void
+TrexStatelessPort::encode_stats(Json::Value &port) {
+
+ const TrexPlatformApi *api = get_stateless_obj()->get_platform_api();
+
+ TrexPlatformInterfaceStats stats;
+ api->get_interface_stats(m_port_id, stats);
+
+ port["tx_bps"] = stats.m_stats.m_tx_bps;
+ port["rx_bps"] = stats.m_stats.m_rx_bps;
+
+ port["tx_pps"] = stats.m_stats.m_tx_pps;
+ port["rx_pps"] = stats.m_stats.m_rx_pps;
+
+ port["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts);
+ port["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts);
+
+ port["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes);
+ port["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes);
+
+ port["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors);
+}
+
+void
+TrexStatelessPort::send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg) {
+
+ for (auto core_id : m_cores_id_list) {
+ send_message_to_dp(core_id, msg->clone());
}
- return (ss.str());
+ /* original was not sent - delete it */
+ delete msg;
+}
+
+void
+TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg) {
+
+ /* send the message to the core */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
+ ring->Enqueue((CGenNode *)msg);
}
/**
- * update stats for the port
+ * when a DP (async) event occurs - handle it
*
*/
void
-TrexStatelessPort::update_stats() {
- struct rte_eth_stats stats;
- rte_eth_stats_get(m_port_id, &stats);
+TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
+ Json::Value data;
- /* copy straight values */
- m_stats.m_stats.m_total_tx_bytes = stats.obytes;
- m_stats.m_stats.m_total_rx_bytes = stats.ibytes;
+ switch (event_type) {
- m_stats.m_stats.m_total_tx_pkts = stats.opackets;
- m_stats.m_stats.m_total_rx_pkts = stats.ipackets;
+ case TrexDpPortEvent::EVENT_STOP:
+ /* set a stop event */
+ change_state(PORT_STATE_STREAMS);
+ /* send a ZMQ event */
- /* calculate stats */
- m_stats.m_stats.m_tx_bps = m_stats.m_bw_tx_bps.add(stats.obytes);
- m_stats.m_stats.m_rx_bps = m_stats.m_bw_rx_bps.add(stats.ibytes);
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data);
+ break;
- m_stats.m_stats.m_tx_pps = m_stats.m_bw_tx_pps.add(stats.opackets);
- m_stats.m_stats.m_rx_pps = m_stats.m_bw_rx_pps.add(stats.ipackets);
+ default:
+ assert(0);
+ }
}
-const TrexPortStats &
-TrexStatelessPort::get_stats() {
- return m_stats;
+uint64_t
+TrexStatelessPort::get_port_speed_bps() const {
+ switch (m_speed) {
+ case TrexPlatformApi::SPEED_1G:
+ return (1LLU * 1000 * 1000 * 1000);
+
+ case TrexPlatformApi::SPEED_10G:
+ return (10LLU * 1000 * 1000 * 1000);
+
+ case TrexPlatformApi::SPEED_40G:
+ return (40LLU * 1000 * 1000 * 1000);
+
+ default:
+ return 0;
+ }
}
+double
+TrexStatelessPort::calculate_effective_factor(const TrexPortMultiplier &mul) {
+
+ /* for a simple factor request */
+ if (mul.m_type == TrexPortMultiplier::MUL_FACTOR) {
+ return (mul.m_value);
+ }
+
+ /* we now need the graph - generate it if we don't have it (happens once) */
+ if (!m_graph_obj) {
+ generate_streams_graph();
+ }
+
+ switch (mul.m_type) {
+ case TrexPortMultiplier::MUL_BPS:
+ return (mul.m_value / m_graph_obj->get_max_bps());
+
+ case TrexPortMultiplier::MUL_PPS:
+ return (mul.m_value / m_graph_obj->get_max_pps());
+
+ case TrexPortMultiplier::MUL_PERCENTAGE:
+ /* if abs percentage is from the line speed - otherwise its from the current speed */
+
+ if (mul.m_op == TrexPortMultiplier::OP_ABS) {
+ double required = (mul.m_value / 100.0) * get_port_speed_bps();
+ return (required / m_graph_obj->get_max_bps());
+ } else {
+ return (m_factor * (mul.m_value / 100.0));
+ }
+
+ default:
+ assert(0);
+ }
+
+}
+
+
void
-TrexStatelessPort::encode_stats(Json::Value &port) {
+TrexStatelessPort::generate_streams_graph() {
- port["tx_bps"] = m_stats.m_stats.m_tx_bps;
- port["rx_bps"] = m_stats.m_stats.m_rx_bps;
+ /* dispose of the old one */
+ if (m_graph_obj) {
+ delete_streams_graph();
+ }
- port["tx_pps"] = m_stats.m_stats.m_tx_pps;
- port["rx_pps"] = m_stats.m_stats.m_rx_pps;
+ /* fetch all the streams from the table */
+ vector<TrexStream *> streams;
+ get_object_list(streams);
- port["total_tx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_pkts);
- port["total_rx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_pkts);
+ TrexStreamsGraph graph;
+ m_graph_obj = graph.generate(streams);
+}
- port["total_tx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_bytes);
- port["total_rx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_bytes);
-
- port["tx_rx_errors"] = Json::Value::UInt64(m_stats.m_stats.m_tx_rx_errors);
+void
+TrexStatelessPort::delete_streams_graph() {
+ if (m_graph_obj) {
+ delete m_graph_obj;
+ m_graph_obj = NULL;
+ }
}
/***************************
- * BW measurement
+ * port multiplier
*
**************************/
-/* TODO: move this to a common place */
-BWMeasure::BWMeasure() {
- reset();
-}
+const std::initializer_list<std::string> TrexPortMultiplier::g_types = {"raw", "bps", "pps", "percentage"};
+const std::initializer_list<std::string> TrexPortMultiplier::g_ops = {"abs", "add", "sub"};
+
+TrexPortMultiplier::
+TrexPortMultiplier(const std::string &type_str, const std::string &op_str, double value) {
+ mul_type_e type;
+ mul_op_e op;
+
+ if (type_str == "raw") {
+ type = MUL_FACTOR;
+
+ } else if (type_str == "bps") {
+ type = MUL_BPS;
-void BWMeasure::reset(void) {
- m_start=false;
- m_last_time_msec=0;
- m_last_bytes=0;
- m_last_result=0.0;
-};
+ } else if (type_str == "pps") {
+ type = MUL_PPS;
+
+ } else if (type_str == "percentage") {
+ type = MUL_PERCENTAGE;
+ } else {
+ throw TrexException("bad type str: " + type_str);
+ }
+
+ if (op_str == "abs") {
+ op = OP_ABS;
+
+ } else if (op_str == "add") {
+ op = OP_ADD;
+
+ } else if (op_str == "sub") {
+ op = OP_SUB;
+
+ } else {
+ throw TrexException("bad op str: " + op_str);
+ }
+
+ m_type = type;
+ m_op = op;
+ m_value = value;
-double BWMeasure::calc_MBsec(uint32_t dtime_msec,
- uint64_t dbytes){
- double rate=0.000008*( ( (double)dbytes*(double)os_get_time_freq())/((double)dtime_msec) );
- return(rate);
}
-double BWMeasure::add(uint64_t size) {
- if ( false == m_start ) {
- m_start=true;
- m_last_time_msec = os_get_time_msec() ;
- m_last_bytes=size;
- return(0.0);
+const TrexStreamsGraphObj *
+TrexStatelessPort::validate(void) {
+
+ /* first compile the graph */
+
+ vector<TrexStream *> streams;
+ get_object_list(streams);
+
+ if (streams.size() == 0) {
+ throw TrexException("no streams attached to port");
+ }
+
+ TrexStreamsCompiler compiler;
+ std::vector<TrexStreamsCompiledObj *> compiled_objs;
+
+ std::string fail_msg;
+ bool rc = compiler.compile(m_port_id,
+ streams,
+ compiled_objs,
+ get_dp_core_count(),
+ 1.0,
+ &fail_msg);
+ if (!rc) {
+ throw TrexException(fail_msg);
+ }
+
+ for (auto obj : compiled_objs) {
+ delete obj;
}
- uint32_t ctime=os_get_time_msec();
- if ((ctime - m_last_time_msec) <os_get_time_freq() ) {
- return(m_last_result);
+ /* now create a stream graph */
+ if (!m_graph_obj) {
+ generate_streams_graph();
}
- uint32_t dtime_msec = ctime-m_last_time_msec;
- uint64_t dbytes = size - m_last_bytes;
+ return m_graph_obj;
+}
- m_last_time_msec = ctime;
- m_last_bytes = size;
- m_last_result= 0.5*calc_MBsec(dtime_msec,dbytes) +0.5*(m_last_result);
- return( m_last_result );
+/************* Trex Port Owner **************/
+
+TrexPortOwner::TrexPortOwner() {
+ m_is_free = true;
+
+ /* for handlers random generation */
+ srand(time(NULL));
}
+/**
+ * generate a random connection handler
+ *
+ */
+std::string
+TrexPortOwner::generate_handler() {
+ std::stringstream ss;
+
+ static const char alphanum[] =
+ "0123456789"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "abcdefghijklmnopqrstuvwxyz";
+
+ /* generate 8 bytes of random handler */
+ for (int i = 0; i < 8; ++i) {
+ ss << alphanum[rand() % (sizeof(alphanum) - 1)];
+ }
+
+ return (ss.str());
+}
+const std::string TrexPortOwner::g_unowned_name = "<FREE>";
+const std::string TrexPortOwner::g_unowned_handler = "";
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 428d5aee..4988b46a 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -22,87 +22,103 @@ limitations under the License.
#define __TREX_STATELESS_PORT_H__
#include <trex_stream.h>
-
-/**
- * bandwidth measurement class
+#include <trex_dp_port_events.h>
+#include <internal_api/trex_platform_api.h>
+
+class TrexStatelessCpToDpMsgBase;
+class TrexStreamsGraphObj;
+class TrexPortMultiplier;
+
+/**
+ * TRex port owner can perform
+ * write commands
+ * while port is owned - others can
+ * do read only commands
*
*/
-class BWMeasure {
+class TrexPortOwner {
public:
- BWMeasure();
- void reset(void);
- double add(uint64_t size);
-private:
- double calc_MBsec(uint32_t dtime_msec,
- uint64_t dbytes);
+ TrexPortOwner();
-public:
- bool m_start;
- uint32_t m_last_time_msec;
- uint64_t m_last_bytes;
- double m_last_result;
-};
+ /**
+ * is port free to acquire
+ */
+ bool is_free() {
+ return m_is_free;
+ }
-/**
- * TRex stateless port stats
- *
- * @author imarom (24-Sep-15)
- */
-class TrexPortStats {
+ void release() {
+ m_is_free = true;
+ m_owner_name = "";
+ m_handler = "";
+ }
-public:
- TrexPortStats() {
- m_stats = {0};
+ bool is_owned_by(const std::string &user) {
+ return ( !m_is_free && (m_owner_name == user) );
+ }
+
+ void own(const std::string &owner_name) {
- m_bw_tx_bps.reset();
- m_bw_rx_bps.reset();
+ /* save user data */
+ m_owner_name = owner_name;
- m_bw_tx_pps.reset();
- m_bw_rx_pps.reset();
+ /* internal data */
+ m_handler = generate_handler();
+ m_is_free = false;
}
-public:
+ bool verify(const std::string &handler) {
+ return ( (!m_is_free) && (m_handler == handler) );
+ }
- BWMeasure m_bw_tx_bps;
- BWMeasure m_bw_rx_bps;
-
- BWMeasure m_bw_tx_pps;
- BWMeasure m_bw_rx_pps;
-
- struct {
-
- double m_tx_bps;
- double m_rx_bps;
-
- double m_tx_pps;
- double m_rx_pps;
-
- uint64_t m_total_tx_pkts;
- uint64_t m_total_rx_pkts;
-
- uint64_t m_total_tx_bytes;
- uint64_t m_total_rx_bytes;
-
- uint64_t m_tx_rx_errors;
- } m_stats;
+ const std::string &get_name() {
+ return (!m_is_free ? m_owner_name : g_unowned_name);
+ }
+
+ const std::string &get_handler() {
+ return (!m_is_free ? m_handler : g_unowned_handler);
+ }
+
+
+private:
+ std::string generate_handler();
+
+ /* is this port owned by someone ? */
+ bool m_is_free;
+
+ /* user provided info */
+ std::string m_owner_name;
+
+ /* handler genereated internally */
+ std::string m_handler;
+
+
+ /* just references defaults... */
+ static const std::string g_unowned_name;
+ static const std::string g_unowned_handler;
};
+
/**
* describes a stateless port
*
* @author imarom (31-Aug-15)
*/
class TrexStatelessPort {
+ friend class TrexDpPortEvent;
+
public:
/**
* port state
*/
enum port_state_e {
- PORT_STATE_DOWN,
- PORT_STATE_UP_IDLE,
- PORT_STATE_TRANSMITTING
+ PORT_STATE_DOWN = 0x1,
+ PORT_STATE_IDLE = 0x2,
+ PORT_STATE_STREAMS = 0x4,
+ PORT_STATE_TX = 0x8,
+ PORT_STATE_PAUSE = 0x10,
};
/**
@@ -115,31 +131,66 @@ public:
RC_ERR_FAILED_TO_COMPILE_STREAMS
};
- TrexStatelessPort(uint8_t port_id);
+
+ TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api);
+
+ /**
+ * acquire port
+ * throws TrexException in case of an error
+ */
+ void acquire(const std::string &user, bool force = false);
+
+ /**
+ * release the port from the current user
+ * throws TrexException in case of an error
+ */
+ void release(void);
+
+ /**
+ * validate the state of the port before start
+ * it will return a stream graph
+ * containing information about the streams
+ * configured on this port
+ *
+ * on error it throws TrexException
+ */
+ const TrexStreamsGraphObj *validate(void);
/**
* start traffic
- *
+ * throws TrexException in case of an error
*/
- rc_e start_traffic(void);
+ void start_traffic(const TrexPortMultiplier &mul, double duration);
/**
* stop traffic
- *
+ * throws TrexException in case of an error
*/
void stop_traffic(void);
/**
- * access the stream table
+ * pause traffic
+ * throws TrexException in case of an error
+ */
+ void pause_traffic(void);
+
+ /**
+ * resume traffic
+ * throws TrexException in case of an error
+ */
+ void resume_traffic(void);
+
+ /**
+ * update current traffic on port
*
*/
- TrexStreamTable *get_stream_table();
+ void update_traffic(const TrexPortMultiplier &mul);
/**
* get the port state
*
*/
- port_state_e get_state() {
+ port_state_e get_state() const {
return m_port_state;
}
@@ -147,7 +198,7 @@ public:
* port state as string
*
*/
- std::string get_state_as_string();
+ std::string get_state_as_string() const;
/**
* fill up properties of the port
@@ -157,74 +208,219 @@ public:
* @param driver
* @param speed
*/
- void get_properties(std::string &driver, std::string &speed);
+ void get_properties(std::string &driver, TrexPlatformApi::driver_speed_e &speed);
+
+
/**
- * query for ownership
- *
- */
- const std::string &get_owner() {
- return m_owner;
+ * encode stats as JSON
+ */
+ void encode_stats(Json::Value &port);
+
+ uint8_t get_port_id() {
+ return m_port_id;
}
/**
- * owner handler
- * for the connection
+ * delegators
*
*/
- const std::string &get_owner_handler() {
- return m_owner_handler;
+
+ void add_stream(TrexStream *stream) {
+ verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
+
+ m_stream_table.add_stream(stream);
+ delete_streams_graph();
+
+ change_state(PORT_STATE_STREAMS);
}
- bool is_free_to_aquire() {
- return (m_owner == "none");
+ void remove_stream(TrexStream *stream) {
+ verify_state(PORT_STATE_STREAMS);
+
+ m_stream_table.remove_stream(stream);
+ delete_streams_graph();
+
+ if (m_stream_table.size() == 0) {
+ change_state(PORT_STATE_IDLE);
+ }
}
- /**
- * take ownership of the server array
- * this is static
- * ownership is total
- *
- */
- void set_owner(const std::string &owner) {
- m_owner = owner;
- m_owner_handler = generate_handler();
+ void remove_and_delete_all_streams() {
+ verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
+
+ m_stream_table.remove_and_delete_all_streams();
+ delete_streams_graph();
+
+ change_state(PORT_STATE_IDLE);
}
- void clear_owner() {
- m_owner = "none";
- m_owner_handler = "";
+ TrexStream * get_stream_by_id(uint32_t stream_id) {
+ return m_stream_table.get_stream_by_id(stream_id);
}
- bool verify_owner_handler(const std::string &handler) {
+ void get_id_list(std::vector<uint32_t> &id_list) {
+ m_stream_table.get_id_list(id_list);
+ }
- return ( (m_owner != "none") && (m_owner_handler == handler) );
+ void get_object_list(std::vector<TrexStream *> &object_list) {
+ m_stream_table.get_object_list(object_list);
+ }
+ TrexDpPortEvents & get_dp_events() {
+ return m_dp_events;
}
+
/**
- * update the values of the stats
+ * returns the number of DP cores linked to this port
*
*/
- void update_stats();
+ uint8_t get_dp_core_count() {
+ return m_cores_id_list.size();
+ }
- const TrexPortStats & get_stats();
+ /**
+ * returns the traffic multiplier currently being used by the DP
+ *
+ */
+ double get_multiplier() {
+ return (m_factor);
+ }
/**
- * encode stats as JSON
+ * get port speed in bits per second
+ *
*/
- void encode_stats(Json::Value &port);
+ uint64_t get_port_speed_bps() const;
+
+ TrexPortOwner & get_owner() {
+ return m_owner;
+ }
private:
+
+ const std::vector<int> get_core_id_list () {
+ return m_cores_id_list;
+ }
+
+ bool verify_state(int state, bool should_throw = true) const;
+
+ void change_state(port_state_e new_state);
+
std::string generate_handler();
- TrexStreamTable m_stream_table;
- uint8_t m_port_id;
- port_state_e m_port_state;
- std::string m_owner;
- std::string m_owner_handler;
- TrexPortStats m_stats;
+ /**
+ * send message to all cores using duplicate
+ *
+ */
+ void send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg);
+
+ /**
+ * send message to specific DP core
+ *
+ */
+ void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg);
+
+ /**
+ * triggered when event occurs
+ *
+ */
+ void on_dp_event_occured(TrexDpPortEvent::event_e event_type);
+
+
+ /**
+ * calculate effective M per core
+ *
+ */
+ double calculate_effective_factor(const TrexPortMultiplier &mul);
+
+
+
+ /**
+ * generates a graph of streams graph
+ *
+ */
+ void generate_streams_graph();
+
+ /**
+ * dispose of it
+ *
+ * @author imarom (26-Nov-15)
+ */
+ void delete_streams_graph();
+
+
+ TrexStreamTable m_stream_table;
+ uint8_t m_port_id;
+ port_state_e m_port_state;
+ std::string m_driver_name;
+
+ TrexPlatformApi::driver_speed_e m_speed;
+
+ /* holds the DP cores associated with this port */
+ std::vector<int> m_cores_id_list;
+
+ bool m_last_all_streams_continues;
+ double m_last_duration;
+ double m_factor;
+
+ TrexDpPortEvents m_dp_events;
+
+ /* holds a graph of streams rate*/
+ const TrexStreamsGraphObj *m_graph_obj;
+
+ /* owner information */
+ TrexPortOwner m_owner;
+};
+
+
+/**
+ * port multiplier object
+ *
+ */
+class TrexPortMultiplier {
+public:
+
+
+ /**
+ * defines the type of multipler passed to start
+ */
+ enum mul_type_e {
+ MUL_FACTOR,
+ MUL_BPS,
+ MUL_PPS,
+ MUL_PERCENTAGE
+ };
+
+ /**
+ * multiplier can be absolute value
+ * increment value or subtract value
+ */
+ enum mul_op_e {
+ OP_ABS,
+ OP_ADD,
+ OP_SUB
+ };
+
+
+ TrexPortMultiplier(mul_type_e type, mul_op_e op, double value) {
+ m_type = type;
+ m_op = op;
+ m_value = value;
+ }
+
+ TrexPortMultiplier(const std::string &type_str, const std::string &op_str, double value);
+
+
+public:
+ static const std::initializer_list<std::string> g_types;
+ static const std::initializer_list<std::string> g_ops;
+
+ mul_type_e m_type;
+ mul_op_e m_op;
+ double m_value;
};
#endif /* __TREX_STATELESS_PORT_H__ */
diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp
index 182036f1..cad603e2 100644
--- a/src/stateless/cp/trex_stream.cpp
+++ b/src/stateless/cp/trex_stream.cpp
@@ -20,13 +20,82 @@ limitations under the License.
*/
#include <trex_stream.h>
#include <cstddef>
+#include <string.h>
+#include <assert.h>
/**************************************
* stream
*************************************/
-TrexStream::TrexStream(uint8_t port_id, uint32_t stream_id) : m_port_id(port_id), m_stream_id(stream_id) {
+
+
+std::string TrexStream::get_stream_type_str(stream_type_t stream_type){
+
+ std::string res;
+
+
+ switch (stream_type) {
+
+ case stCONTINUOUS :
+ res="stCONTINUOUS ";
+ break;
+
+ case stSINGLE_BURST :
+ res="stSINGLE_BURST ";
+ break;
+
+ case stMULTI_BURST :
+ res="stMULTI_BURST ";
+ break;
+ default:
+ res="Unknow ";
+ };
+ return(res);
+}
+
+
+void TrexStream::Dump(FILE *fd){
+
+ fprintf(fd,"\n");
+ fprintf(fd,"==> Stream_id : %lu \n",(ulong)m_stream_id);
+ fprintf(fd," Enabled : %lu \n",(ulong)(m_enabled?1:0));
+ fprintf(fd," Self_start : %lu \n",(ulong)(m_self_start?1:0));
+
+ if (m_next_stream_id>=0) {
+ fprintf(fd," Nex_stream_id : %lu \n",(ulong)m_next_stream_id);
+ }else {
+ fprintf(fd," Nex_stream_id : %d \n",m_next_stream_id);
+ }
+
+ fprintf(fd," Port_id : %lu \n",(ulong)m_port_id);
+
+ if (m_isg_usec>0.0) {
+ fprintf(fd," isg : %6.2f \n",m_isg_usec);
+ }
+ fprintf(fd," type : %s \n",get_stream_type_str(m_type).c_str());
+
+ if ( m_type == TrexStream::stCONTINUOUS ) {
+ fprintf(fd," pps : %f \n",m_pps);
+ }
+ if (m_type == TrexStream::stSINGLE_BURST) {
+ fprintf(fd," pps : %f \n",m_pps);
+ fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts);
+ }
+ if (m_type == TrexStream::stMULTI_BURST) {
+ fprintf(fd," pps : %f \n",m_pps);
+ fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts);
+ fprintf(fd," mburst : %lu \n",(ulong)m_num_bursts);
+ if (m_ibg_usec>0.0) {
+ fprintf(fd," m_ibg_usec : %f \n",m_ibg_usec);
+ }
+ }
+}
+
+
+TrexStream::TrexStream(uint8_t type,
+ uint8_t port_id, uint32_t stream_id) : m_port_id(port_id), m_stream_id(stream_id) {
/* default values */
+ m_type = type;
m_isg_usec = 0;
m_next_stream_id = -1;
m_enabled = false;
@@ -37,6 +106,11 @@ TrexStream::TrexStream(uint8_t port_id, uint32_t stream_id) : m_port_id(port_id)
m_rx_check.m_enable = false;
+
+ m_pps=-1.0;
+ m_burst_total_pkts=0;
+ m_num_bursts=1;
+ m_ibg_usec=0.0;
}
TrexStream::~TrexStream() {
@@ -56,6 +130,7 @@ TrexStream::get_stream_json() {
return m_stream_json;
}
+
/**************************************
* stream table
*************************************/
@@ -103,14 +178,24 @@ TrexStream * TrexStreamTable::get_stream_by_id(uint32_t stream_id) {
}
}
-void TrexStreamTable::get_stream_list(std::vector<uint32_t> &stream_list) {
- stream_list.clear();
+void TrexStreamTable::get_id_list(std::vector<uint32_t> &id_list) {
+ id_list.clear();
+
+ for (auto stream : m_stream_table) {
+ id_list.push_back(stream.first);
+ }
+}
+
+void TrexStreamTable::get_object_list(std::vector<TrexStream *> &object_list) {
+ object_list.clear();
for (auto stream : m_stream_table) {
- stream_list.push_back(stream.first);
+ object_list.push_back(stream.second);
}
+
}
int TrexStreamTable::size() {
return m_stream_table.size();
}
+
diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h
index f5bc96ef..b991b05f 100644
--- a/src/stateless/cp/trex_stream.h
+++ b/src/stateless/cp/trex_stream.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -29,22 +30,49 @@ limitations under the License.
#include <json/json.h>
#include <trex_stream_vm.h>
+#include <stdio.h>
+#include <string.h>
class TrexRpcCmdAddStream;
+
+struct CStreamPktData {
+ uint8_t *binary;
+ uint16_t len;
+
+ std::string meta;
+
+public:
+ inline void clone(uint8_t * in_binary,
+ uint32_t in_pkt_size){
+ binary = new uint8_t[in_pkt_size];
+ len = in_pkt_size;
+ memcpy(binary,in_binary,in_pkt_size);
+ }
+};
+
+
/**
* Stateless Stream
*
*/
class TrexStream {
- /* provide the RPC parser a way to access private fields */
- friend class TrexRpcCmdAddStream;
- friend class TrexRpcCmdGetStream;
- friend class TrexStreamTable;
public:
- TrexStream(uint8_t port_id, uint32_t stream_id);
- virtual ~TrexStream() = 0;
+ enum STREAM_TYPE {
+ stNONE = 0,
+ stCONTINUOUS = 4,
+ stSINGLE_BURST = 5,
+ stMULTI_BURST = 6
+ };
+
+ typedef uint8_t stream_type_t ;
+
+ static std::string get_stream_type_str(stream_type_t stream_type);
+
+public:
+ TrexStream(uint8_t type,uint8_t port_id, uint32_t stream_id);
+ virtual ~TrexStream();
/* defines the min max per packet supported */
static const uint32_t MIN_PKT_SIZE_BYTES = 1;
@@ -56,10 +84,89 @@ public:
/* access the stream json */
const Json::Value & get_stream_json();
-protected:
+ /* compress the stream id to be zero based */
+ void fix_dp_stream_id(uint32_t my_stream_id,int next_stream_id){
+ m_stream_id = my_stream_id;
+ m_next_stream_id = next_stream_id;
+ }
+
+ double get_pps() const {
+ return m_pps;
+ }
+
+ void set_pps(double pps){
+ m_pps = pps;
+ }
+
+ void set_type(uint8_t type){
+ m_type = type;
+ }
+
+ uint8_t get_type(void) const {
+ return ( m_type );
+ }
+
+ bool is_dp_next_stream(){
+ if (m_next_stream_id<0) {
+ return (false);
+ }else{
+ return (true);
+ }
+ }
+
+
+
+ void set_multi_burst(uint32_t burst_total_pkts,
+ uint32_t num_bursts,
+ double ibg_usec) {
+ m_burst_total_pkts = burst_total_pkts;
+ m_num_bursts = num_bursts;
+ m_ibg_usec = ibg_usec;
+ }
+
+ void set_single_burst(uint32_t burst_total_pkts){
+ set_multi_burst(burst_total_pkts,1,0.0);
+ }
+
+ /* create new stream */
+ TrexStream * clone_as_dp() const {
+
+ TrexStream *dp = new TrexStream(m_type,m_port_id,m_stream_id);
+
+
+ dp->m_isg_usec = m_isg_usec;
+ dp->m_next_stream_id = m_next_stream_id;
+
+ dp->m_enabled = m_enabled;
+ dp->m_self_start = m_self_start;
+
+ /* deep copy */
+ dp->m_pkt.clone(m_pkt.binary,m_pkt.len);
+
+ dp->m_rx_check = m_rx_check;
+ dp->m_pps = m_pps;
+ dp->m_burst_total_pkts = m_burst_total_pkts;
+ dp->m_num_bursts = m_num_bursts;
+ dp->m_ibg_usec = m_ibg_usec ;
+ return (dp);
+ }
+
+
+ double get_burst_length_usec() const {
+ return ( (m_burst_total_pkts / m_pps) * 1000 * 1000);
+ }
+
+ double get_bps() const {
+ /* packet length + 4 CRC bytes to bits and multiplied by PPS */
+ return (m_pps * (m_pkt.len + 4) * 8);
+ }
+
+ void Dump(FILE *fd);
+public:
/* basic */
+ uint8_t m_type;
uint8_t m_port_id;
- uint32_t m_stream_id;
+ uint32_t m_stream_id; /* id from RPC can be anything */
/* config fields */
@@ -69,13 +176,9 @@ protected:
/* indicators */
bool m_enabled;
bool m_self_start;
-
+
+ CStreamPktData m_pkt;
/* pkt */
- struct {
- uint8_t *binary;
- uint16_t len;
- std::string meta;
- } m_pkt;
/* VM */
StreamVm m_vm;
@@ -89,64 +192,19 @@ protected:
} m_rx_check;
+ double m_pps;
- /* original template provided by requester */
- Json::Value m_stream_json;
-};
-
-/**
- * continuous stream
- *
- */
-class TrexStreamContinuous : public TrexStream {
-public:
- TrexStreamContinuous(uint8_t port_id, uint32_t stream_id, double pps) : TrexStream(port_id, stream_id), m_pps(pps) {
- }
+ uint32_t m_burst_total_pkts; /* valid in case of burst stSINGLE_BURST,stMULTI_BURST*/
- double get_pps() {
- return m_pps;
- }
+ uint32_t m_num_bursts; /* valid in case of stMULTI_BURST */
-protected:
- double m_pps;
-};
+ double m_ibg_usec; /* valid in case of stMULTI_BURST */
-/**
- * single burst
- *
- */
-class TrexStreamBurst : public TrexStream {
-public:
- TrexStreamBurst(uint8_t port_id, uint32_t stream_id, uint32_t total_pkts, double pps) :
- TrexStream(port_id, stream_id),
- m_total_pkts(total_pkts),
- m_pps(pps) {
- }
+ /* original template provided by requester */
+ Json::Value m_stream_json;
-protected:
- uint32_t m_total_pkts;
- double m_pps;
};
-/**
- * multi burst
- *
- */
-class TrexStreamMultiBurst : public TrexStreamBurst {
-public:
- TrexStreamMultiBurst(uint8_t port_id,
- uint32_t stream_id,
- uint32_t pkts_per_burst,
- double pps,
- uint32_t num_bursts,
- double ibg_usec) : TrexStreamBurst(port_id, stream_id, pkts_per_burst, pps), m_num_bursts(num_bursts), m_ibg_usec(ibg_usec) {
-
- }
-protected:
- uint32_t m_num_bursts;
- double m_ibg_usec;
-
-};
/**
* holds all the streams
@@ -189,7 +247,13 @@ public:
*
* @param stream_list
*/
- void get_stream_list(std::vector<uint32_t> &stream_list);
+ void get_id_list(std::vector<uint32_t> &id_list);
+
+ /**
+ * populate a list with all the stream objects
+ *
+ */
+ void get_object_list(std::vector<TrexStream *> &object_list);
/**
* get the table size
@@ -197,6 +261,9 @@ public:
*/
int size();
+ std::unordered_map<int, TrexStream *>::iterator begin() {return m_stream_table.begin();}
+ std::unordered_map<int, TrexStream *>::iterator end() {return m_stream_table.end();}
+
private:
/**
* holds all the stream in a hash table by stream id
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
new file mode 100644
index 00000000..478e09f8
--- /dev/null
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -0,0 +1,746 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include <string>
+#include <sstream>
+#include <trex_streams_compiler.h>
+#include <trex_stream.h>
+#include <assert.h>
+#include <trex_stateless.h>
+#include <iostream>
+
+/**
+ * describes a graph node in the pre compile check
+ *
+ * @author imarom (16-Nov-15)
+ */
+class GraphNode {
+public:
+ GraphNode(const TrexStream *stream, GraphNode *next) : m_stream(stream), m_next(next) {
+ m_marked = false;
+ m_compressed_stream_id=-1;
+
+ }
+
+ uint32_t get_stream_id() const {
+ return m_stream->m_stream_id;
+ }
+
+ uint32_t get_next_stream_id() const {
+ return m_stream->m_next_stream_id;
+
+ }
+
+ const TrexStream *m_stream;
+ GraphNode *m_next;
+ std::vector<const GraphNode *> m_parents;
+ bool m_marked;
+ int m_compressed_stream_id;
+};
+
+/**
+ * node map
+ *
+ */
+class GraphNodeMap {
+public:
+
+ GraphNodeMap() : m_dead_end(NULL, NULL) {
+
+ }
+
+ bool add(GraphNode *node) {
+ if (has(node->get_stream_id())) {
+ return false;
+ }
+
+ m_nodes[node->get_stream_id()] = node;
+
+ if (node->m_stream->m_self_start) {
+ m_roots.push_back(node);
+ }
+
+ return true;
+ }
+
+ bool has(uint32_t stream_id) {
+
+ return (get(stream_id) != NULL);
+ }
+
+ GraphNode * get(uint32_t stream_id) {
+
+ if (stream_id == -1) {
+ return &m_dead_end;
+ }
+
+ auto search = m_nodes.find(stream_id);
+
+ if (search != m_nodes.end()) {
+ return search->second;
+ } else {
+ return NULL;
+ }
+ }
+
+ void clear_marks() {
+ for (auto node : m_nodes) {
+ node.second->m_marked = false;
+ }
+ }
+
+ void get_unmarked(std::vector <GraphNode *> &unmarked) {
+ for (auto node : m_nodes) {
+ if (!node.second->m_marked) {
+ unmarked.push_back(node.second);
+ }
+ }
+ }
+
+
+ ~GraphNodeMap() {
+ for (auto node : m_nodes) {
+ delete node.second;
+ }
+ m_nodes.clear();
+ }
+
+ std::vector <GraphNode *> & get_roots() {
+ return m_roots;
+ }
+
+
+ std::unordered_map<uint32_t, GraphNode *> get_nodes() {
+ return m_nodes;
+ }
+
+private:
+ std::unordered_map<uint32_t, GraphNode *> m_nodes;
+ std::vector <GraphNode *> m_roots;
+ GraphNode m_dead_end;
+};
+
+
+/**************************************
+ * stream compiled object
+ *************************************/
+TrexStreamsCompiledObj::TrexStreamsCompiledObj(uint8_t port_id) {
+ m_port_id = port_id;
+ m_all_continues = false;
+}
+
+TrexStreamsCompiledObj::~TrexStreamsCompiledObj() {
+ for (auto obj : m_objs) {
+ delete obj.m_stream;
+ }
+ m_objs.clear();
+}
+
+
+void
+TrexStreamsCompiledObj::add_compiled_stream(TrexStream *stream){
+
+ obj_st obj;
+
+ obj.m_stream = stream;
+
+ m_objs.push_back(obj);
+}
+
+
+TrexStreamsCompiledObj *
+TrexStreamsCompiledObj::clone() {
+
+ TrexStreamsCompiledObj *new_compiled_obj = new TrexStreamsCompiledObj(m_port_id);
+
+ /**
+ * clone each element
+ */
+ for (auto obj : m_objs) {
+ TrexStream *new_stream = obj.m_stream->clone_as_dp();
+ new_compiled_obj->add_compiled_stream(new_stream);
+ }
+
+ return new_compiled_obj;
+
+}
+
+void TrexStreamsCompiledObj::Dump(FILE *fd){
+ for (auto obj : m_objs) {
+ obj.m_stream->Dump(fd);
+ }
+}
+
+
+void
+TrexStreamsCompiler::add_warning(const std::string &warning) {
+ m_warnings.push_back("*** warning: " + warning);
+}
+
+void
+TrexStreamsCompiler::err(const std::string &err) {
+ throw TrexException("*** error: " + err);
+}
+
+void
+TrexStreamsCompiler::check_stream(const TrexStream *stream) {
+ std::stringstream ss;
+
+ /* cont. stream can point only on itself */
+ if (stream->get_type() == TrexStream::stCONTINUOUS) {
+ if (stream->m_next_stream_id != -1) {
+ ss << "continous stream '" << stream->m_stream_id << "' cannot point to another stream";
+ err(ss.str());
+ }
+ }
+}
+
+void
+TrexStreamsCompiler::allocate_pass(const std::vector<TrexStream *> &streams,
+ GraphNodeMap *nodes) {
+ std::stringstream ss;
+ uint32_t compressed_stream_id=0;
+
+
+ /* first pass - allocate all nodes and check for duplicates */
+ for (auto stream : streams) {
+
+ /* skip non enabled streams */
+ if (!stream->m_enabled) {
+ continue;
+ }
+
+ /* sanity check on the stream itself */
+ check_stream(stream);
+
+ /* duplicate stream id ? */
+ if (nodes->has(stream->m_stream_id)) {
+ ss << "duplicate instance of stream id " << stream->m_stream_id;
+ err(ss.str());
+ }
+
+ GraphNode *node = new GraphNode(stream, NULL);
+ /* allocate new compressed id */
+ node->m_compressed_stream_id = compressed_stream_id;
+
+ compressed_stream_id++;
+
+ /* add to the map */
+ assert(nodes->add(node));
+ }
+
+}
+
+/**
+ * on this pass we direct the graph to point to the right nodes
+ *
+ */
+void
+TrexStreamsCompiler::direct_pass(GraphNodeMap *nodes) {
+
+ /* second pass - direct the graph */
+ for (auto p : nodes->get_nodes()) {
+
+ GraphNode *node = p.second;
+ const TrexStream *stream = node->m_stream;
+
+ /* check the stream points on an existing stream */
+ GraphNode *next_node = nodes->get(stream->m_next_stream_id);
+ if (!next_node) {
+ std::stringstream ss;
+ ss << "stream " << node->get_stream_id() << " is pointing on non existent stream " << stream->m_next_stream_id;
+ err(ss.str());
+ }
+
+ node->m_next = next_node;
+
+ /* do we have more than one parent ? */
+ next_node->m_parents.push_back(node);
+ }
+
+
+ /* check for multiple parents */
+ for (auto p : nodes->get_nodes()) {
+ GraphNode *node = p.second;
+
+ if (node->m_parents.size() > 0 ) {
+ std::stringstream ss;
+
+ ss << "stream " << node->get_stream_id() << " is triggered by multiple streams: ";
+ for (auto x : node->m_parents) {
+ ss << x->get_stream_id() << " ";
+ }
+
+ add_warning(ss.str());
+ }
+ }
+}
+
+/**
+ * mark sure all the streams are reachable
+ *
+ */
+void
+TrexStreamsCompiler::check_for_unreachable_streams(GraphNodeMap *nodes) {
+ /* start with the roots */
+ std::vector <GraphNode *> next_nodes = nodes->get_roots();
+
+
+ nodes->clear_marks();
+
+ /* run BFS from all the roots */
+ while (!next_nodes.empty()) {
+
+ /* pull one */
+ GraphNode *node = next_nodes.back();
+ next_nodes.pop_back();
+ if (node->m_marked) {
+ continue;
+ }
+
+ node->m_marked = true;
+
+ if (node->m_next != NULL) {
+ next_nodes.push_back(node->m_next);
+ }
+
+ }
+
+ std::vector <GraphNode *> unmarked;
+ nodes->get_unmarked(unmarked);
+
+ if (!unmarked.empty()) {
+ std::stringstream ss;
+ for (auto node : unmarked) {
+ ss << "stream " << node->get_stream_id() << " is unreachable from any other stream\n";
+ }
+ err(ss.str());
+ }
+
+
+}
+
+/**
+ * check validation of streams for compile
+ *
+ * @author imarom (16-Nov-15)
+ *
+ * @param streams
+ * @param fail_msg
+ *
+ * @return bool
+ */
+void
+TrexStreamsCompiler::pre_compile_check(const std::vector<TrexStream *> &streams,
+ GraphNodeMap & nodes) {
+
+ m_warnings.clear();
+
+ /* allocate nodes */
+ allocate_pass(streams, &nodes);
+
+ /* direct the graph */
+ direct_pass(&nodes);
+
+ /* check for non reachable streams inside the graph */
+ check_for_unreachable_streams(&nodes);
+
+}
+
+/**************************************
+ * stream compiler
+ *************************************/
+bool
+TrexStreamsCompiler::compile(uint8_t port_id,
+ const std::vector<TrexStream *> &streams,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ uint8_t dp_core_count,
+ double factor,
+ std::string *fail_msg) {
+
+#if 0
+ for (auto stream : streams) {
+ stream->Dump(stdout);
+ }
+ fprintf(stdout,"------------pre compile \n");
+#endif
+
+ GraphNodeMap nodes;
+
+
+ /* compile checks */
+ try {
+ pre_compile_check(streams, nodes);
+ } catch (const TrexException &ex) {
+ if (fail_msg) {
+ *fail_msg = ex.what();
+ } else {
+ std::cout << ex.what();
+ }
+ return false;
+ }
+
+ /* check if all are cont. streams */
+ bool all_continues = true;
+ for (const auto stream : streams) {
+ if (stream->get_type() != TrexStream::stCONTINUOUS) {
+ all_continues = false;
+ break;
+ }
+ }
+
+ /* allocate objects for all DP cores */
+ for (uint8_t i = 0; i < dp_core_count; i++) {
+ TrexStreamsCompiledObj *obj = new TrexStreamsCompiledObj(port_id);
+ obj->m_all_continues = all_continues;
+ objs.push_back(obj);
+ }
+
+ /* compile all the streams */
+ for (auto stream : streams) {
+
+ /* skip non-enabled streams */
+ if (!stream->m_enabled) {
+ continue;
+ }
+
+ /* compile a single stream to all cores */
+ compile_stream(stream, factor, dp_core_count, objs, nodes);
+
+ }
+
+ return true;
+}
+
+/**
+ * compiles a single stream to DP objects
+ *
+ * @author imarom (03-Dec-15)
+ *
+ */
+void
+TrexStreamsCompiler::compile_stream(const TrexStream *stream,
+ double factor,
+ uint8_t dp_core_count,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ GraphNodeMap &nodes) {
+
+
+ /* fix the stream ids */
+ int new_id = nodes.get(stream->m_stream_id)->m_compressed_stream_id;
+ assert(new_id >= 0);
+
+ int new_next_id = -1;
+ if (stream->m_next_stream_id >= 0) {
+ new_next_id = nodes.get(stream->m_next_stream_id)->m_compressed_stream_id;
+ }
+
+ /* calculate rate */
+ double per_core_rate = (stream->m_pps * (factor / dp_core_count));
+ int per_core_burst_total_pkts = (stream->m_burst_total_pkts / dp_core_count);
+
+ std::vector<TrexStream *> per_core_streams(dp_core_count);
+
+ /* for each core - creates its own version of the stream */
+ for (uint8_t i = 0; i < dp_core_count; i++) {
+ TrexStream *dp_stream = stream->clone_as_dp();
+
+ /* fix stream ID */
+ dp_stream->fix_dp_stream_id(new_id, new_next_id);
+
+
+ /* adjust rate and packets count */
+ dp_stream->m_pps = per_core_rate;
+ dp_stream->m_burst_total_pkts = per_core_burst_total_pkts;
+
+ per_core_streams[i] = dp_stream;
+ }
+
+ /* take care of remainder from a burst */
+ int burst_remainder = stream->m_burst_total_pkts - (per_core_burst_total_pkts * dp_core_count);
+ per_core_streams[0]->m_burst_total_pkts += burst_remainder;
+
+ /* attach the compiled stream of every core to its object */
+ for (uint8_t i = 0; i < dp_core_count; i++) {
+ objs[i]->add_compiled_stream(per_core_streams[i]);
+ }
+
+
+}
+
+/**************************************
+ * streams graph
+ *************************************/
+
+/**
+ * for each stream we create the right rate events (up/down)
+ *
+ * @author imarom (24-Nov-15)
+ *
+ * @param offset_usec
+ * @param stream
+ */
+void
+TrexStreamsGraph::add_rate_events_for_stream(double &offset_usec, const TrexStream *stream) {
+
+ switch (stream->get_type()) {
+
+ case TrexStream::stCONTINUOUS:
+ add_rate_events_for_stream_cont(offset_usec, stream);
+ return;
+
+ case TrexStream::stSINGLE_BURST:
+ add_rate_events_for_stream_single_burst(offset_usec, stream);
+ return;
+
+ case TrexStream::stMULTI_BURST:
+ add_rate_events_for_stream_multi_burst(offset_usec, stream);
+ return;
+ }
+}
+
+/**
+ * continous stream
+ *
+ */
+void
+TrexStreamsGraph::add_rate_events_for_stream_cont(double &offset_usec, const TrexStream *stream) {
+
+ TrexStreamsGraphObj::rate_event_st start_event;
+
+ /* for debug purposes */
+ start_event.stream_id = stream->m_stream_id;
+
+ start_event.time = offset_usec + stream->m_isg_usec;
+ start_event.diff_pps = stream->get_pps();
+ start_event.diff_bps = stream->get_bps();
+ m_graph_obj->add_rate_event(start_event);
+
+ /* no more events after this stream */
+ offset_usec = -1;
+
+ /* also mark we have an inifite time */
+ m_graph_obj->m_expected_duration = -1;
+}
+
+/**
+ * single burst stream
+ *
+ */
+void
+TrexStreamsGraph::add_rate_events_for_stream_single_burst(double &offset_usec, const TrexStream *stream) {
+ TrexStreamsGraphObj::rate_event_st start_event;
+ TrexStreamsGraphObj::rate_event_st stop_event;
+
+
+ /* for debug purposes */
+ start_event.stream_id = stream->m_stream_id;
+ stop_event.stream_id = stream->m_stream_id;
+
+ /* start event */
+ start_event.time = offset_usec + stream->m_isg_usec;
+ start_event.diff_pps = stream->get_pps();
+ start_event.diff_bps = stream->get_bps();
+ m_graph_obj->add_rate_event(start_event);
+
+ /* stop event */
+ stop_event.time = start_event.time + stream->get_burst_length_usec();
+ stop_event.diff_pps = -(start_event.diff_pps);
+ stop_event.diff_bps = -(start_event.diff_bps);
+ m_graph_obj->add_rate_event(stop_event);
+
+ /* next stream starts from here */
+ offset_usec = stop_event.time;
+
+}
+
+/**
+ * multi burst stream
+ *
+ */
+void
+TrexStreamsGraph::add_rate_events_for_stream_multi_burst(double &offset_usec, const TrexStream *stream) {
+ TrexStreamsGraphObj::rate_event_st start_event;
+ TrexStreamsGraphObj::rate_event_st stop_event;
+
+ /* first the delay is the inter stream gap */
+ double delay = stream->m_isg_usec;
+
+ /* for debug purposes */
+
+ start_event.diff_pps = stream->get_pps();
+ start_event.diff_bps = stream->get_bps();
+ start_event.stream_id = stream->m_stream_id;
+
+ stop_event.diff_pps = -(start_event.diff_pps);
+ stop_event.diff_bps = -(start_event.diff_bps);
+ stop_event.stream_id = stream->m_stream_id;
+
+ /* for each burst create up/down events */
+ for (int i = 0; i < stream->m_num_bursts; i++) {
+
+ start_event.time = offset_usec + delay;
+ m_graph_obj->add_rate_event(start_event);
+
+ stop_event.time = start_event.time + stream->get_burst_length_usec();
+ m_graph_obj->add_rate_event(stop_event);
+
+ /* after the first burst, the delay is inter burst gap */
+ delay = stream->m_ibg_usec;
+
+ offset_usec = stop_event.time;
+ }
+}
+
+/**
+ * for a single root we can until done or a loop detected
+ *
+ * @author imarom (24-Nov-15)
+ *
+ * @param root_stream_id
+ */
+void
+TrexStreamsGraph::generate_graph_for_one_root(uint32_t root_stream_id) {
+
+ std::unordered_map<uint32_t, bool> loop_hash;
+ std::stringstream ss;
+
+ uint32_t stream_id = root_stream_id;
+ double offset = 0;
+
+ while (true) {
+ const TrexStream *stream;
+
+ /* fetch the stream from the hash - if it is not present, report an error */
+ try {
+ stream = m_streams_hash.at(stream_id);
+ } catch (const std::out_of_range &e) {
+ ss << "stream id " << stream_id << " does not exists";
+ throw TrexException(ss.str());
+ }
+
+ /* add the node to the hash for loop detection */
+ loop_hash[stream_id] = true;
+
+ /* create the right rate events for the stream */
+ add_rate_events_for_stream(offset, stream);
+
+ /* do we have a next stream ? */
+ if (stream->m_next_stream_id == -1) {
+ break;
+ }
+
+ /* loop detection */
+ auto search = loop_hash.find(stream->m_next_stream_id);
+ if (search != loop_hash.end()) {
+ m_graph_obj->on_loop_detection();
+ break;
+ }
+
+ /* handle the next one */
+ stream_id = stream->m_next_stream_id;
+ }
+}
+
+/**
+ * for a vector of streams generate a graph of BW
+ * see graph object for more details
+ *
+ */
+const TrexStreamsGraphObj *
+TrexStreamsGraph::generate(const std::vector<TrexStream *> &streams) {
+
+ /* main object to hold the graph - returned to the user */
+ m_graph_obj = new TrexStreamsGraphObj();
+
+ std::vector <uint32_t> root_streams;
+
+ /* before anything we create a hash streams ID
+ and grab the root nodes
+ */
+ for (TrexStream *stream : streams) {
+
+ /* skip non enabled streams */
+ if (!stream->m_enabled) {
+ continue;
+ }
+
+ /* for fast search we populate all the streams in a hash */
+ m_streams_hash[stream->m_stream_id] = stream;
+
+ /* hold all the self start nodes in a vector */
+ if (stream->m_self_start) {
+ root_streams.push_back(stream->m_stream_id);
+ }
+ }
+
+ /* for each node - scan until done or loop */
+ for (uint32_t root_id : root_streams) {
+ generate_graph_for_one_root(root_id);
+ }
+
+
+ m_graph_obj->generate();
+
+ return m_graph_obj;
+}
+
+/**************************************
+ * streams graph object
+ *************************************/
+void
+TrexStreamsGraphObj::find_max_rate() {
+ double max_rate_pps = 0;
+ double current_rate_pps = 0;
+
+ double max_rate_bps = 0;
+ double current_rate_bps = 0;
+
+ /* now we simply walk the list and hold the max */
+ for (auto &ev : m_rate_events) {
+
+ current_rate_pps += ev.diff_pps;
+ current_rate_bps += ev.diff_bps;
+
+ max_rate_pps = std::max(max_rate_pps, current_rate_pps);
+ max_rate_bps = std::max(max_rate_bps, current_rate_bps);
+ }
+
+ /* if not mark as inifite - get the last event time */
+ if (m_expected_duration != -1) {
+ m_expected_duration = m_rate_events.back().time;
+ }
+
+ m_max_pps = max_rate_pps;
+ m_max_bps = max_rate_bps;
+}
+
+static
+bool event_compare (const TrexStreamsGraphObj::rate_event_st &first, const TrexStreamsGraphObj::rate_event_st &second) {
+ return (first.time < second.time);
+}
+
+void
+TrexStreamsGraphObj::generate() {
+ m_rate_events.sort(event_compare);
+ find_max_rate();
+}
+
diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h
new file mode 100644
index 00000000..7fe2dbf2
--- /dev/null
+++ b/src/stateless/cp/trex_streams_compiler.h
@@ -0,0 +1,229 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_STREAMS_COMPILER_H__
+#define __TREX_STREAMS_COMPILER_H__
+
+#include <stdint.h>
+#include <vector>
+#include <list>
+#include <string>
+#include <unordered_map>
+
+class TrexStreamsCompiler;
+class TrexStream;
+class GraphNodeMap;
+
+/**
+ * compiled object for a table of streams
+ *
+ * @author imarom (28-Oct-15)
+ */
+class TrexStreamsCompiledObj {
+ friend class TrexStreamsCompiler;
+
+public:
+
+ TrexStreamsCompiledObj(uint8_t port_id);
+ ~TrexStreamsCompiledObj();
+
+ struct obj_st {
+
+ TrexStream * m_stream;
+ };
+
+ const std::vector<obj_st> & get_objects() {
+ return m_objs;
+ }
+
+ uint8_t get_port_id(){
+ return (m_port_id);
+ }
+
+ bool get_all_streams_continues(){
+ return (m_all_continues);
+ }
+
+ void Dump(FILE *fd);
+
+ TrexStreamsCompiledObj* clone();
+
+private:
+ void add_compiled_stream(TrexStream *stream);
+
+
+ std::vector<obj_st> m_objs;
+
+ bool m_all_continues;
+ uint8_t m_port_id;
+};
+
+class TrexStreamsCompiler {
+public:
+
+ /**
+ * compiles a vector of streams to an object passable to the DP
+ *
+ * @author imarom (28-Oct-15)
+ *
+ */
+ bool compile(uint8_t port_id,
+ const std::vector<TrexStream *> &streams,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ uint8_t dp_core_count = 1,
+ double factor = 1.0,
+ std::string *fail_msg = NULL);
+
+
+ /**
+ *
+ * returns a reference pointer to the last compile warnings
+ * if no warnings were produced - the vector is empty
+ */
+ const std::vector<std::string> & get_last_compile_warnings() {
+ return m_warnings;
+ }
+
+private:
+
+ void pre_compile_check(const std::vector<TrexStream *> &streams,
+ GraphNodeMap & nodes);
+ void allocate_pass(const std::vector<TrexStream *> &streams, GraphNodeMap *nodes);
+ void direct_pass(GraphNodeMap *nodes);
+ void check_for_unreachable_streams(GraphNodeMap *nodes);
+ void check_stream(const TrexStream *stream);
+ void add_warning(const std::string &warning);
+ void err(const std::string &err);
+
+ void compile_stream(const TrexStream *stream,
+ double factor,
+ uint8_t dp_core_count,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ GraphNodeMap &nodes);
+
+ std::vector<std::string> m_warnings;
+};
+
+class TrexStreamsGraph;
+
+/**************************************
+ * streams graph object
+ *
+ * holds the step graph for bandwidth
+ *************************************/
+class TrexStreamsGraphObj {
+ friend class TrexStreamsGraph;
+
+public:
+
+ TrexStreamsGraphObj() {
+ m_max_pps = 0;
+ m_max_bps = 0;
+ m_expected_duration = 0;
+ }
+
+ /**
+ * rate event is defined by those:
+ * time - the time of the event on the timeline
+ * diff - what is the nature of the change ?
+ *
+ * @author imarom (23-Nov-15)
+ */
+ struct rate_event_st {
+ double time;
+ double diff_pps;
+ double diff_bps;
+ uint32_t stream_id;
+ };
+
+ double get_max_pps() const {
+ return m_max_pps;
+ }
+
+ double get_max_bps() const {
+ return m_max_bps;
+ }
+
+ int get_duration() const {
+ return m_expected_duration;
+ }
+
+ const std::list<rate_event_st> & get_events() const {
+ return m_rate_events;
+ }
+
+
+private:
+
+ void on_loop_detection() {
+ m_expected_duration = -1;
+ }
+
+ void add_rate_event(const rate_event_st &ev) {
+ m_rate_events.push_back(ev);
+ }
+
+ void generate();
+ void find_max_rate();
+
+ double m_max_pps;
+ double m_max_bps;
+ int m_expected_duration;
+
+ /* list of rate events */
+ std::list<rate_event_st> m_rate_events;
+
+};
+
+/**
+ * graph creator
+ *
+ * @author imarom (23-Nov-15)
+ */
+class TrexStreamsGraph {
+public:
+
+ TrexStreamsGraph() {
+ m_graph_obj = NULL;
+ }
+
+ /**
+ * generate a sequence graph for streams
+ *
+ */
+ const TrexStreamsGraphObj * generate(const std::vector<TrexStream *> &streams);
+
+private:
+
+ void generate_graph_for_one_root(uint32_t root_stream_id);
+
+ void add_rate_events_for_stream(double &offset, const TrexStream *stream);
+ void add_rate_events_for_stream_cont(double &offset_usec, const TrexStream *stream);
+ void add_rate_events_for_stream_single_burst(double &offset_usec, const TrexStream *stream);
+ void add_rate_events_for_stream_multi_burst(double &offset_usec, const TrexStream *stream);
+
+ /* for fast processing of streams */
+ std::unordered_map<uint32_t, const TrexStream *> m_streams_hash;
+
+ /* main object to hold the graph - returned to the user */
+ TrexStreamsGraphObj *m_graph_obj;
+};
+
+#endif /* __TREX_STREAMS_COMPILER_H__ */
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 3755b82c..22ca922d 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -18,118 +19,632 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
-
#include <trex_stateless_dp_core.h>
-#include <stdio.h>
-#include <unistd.h>
-#include <trex_stateless.h>
+#include <trex_stateless_messaging.h>
+#include <trex_streams_compiler.h>
+#include <trex_stream_node.h>
+#include <trex_stream.h>
#include <bp_sim.h>
-#ifndef TREX_RPC_MOCK_SERVER
-// DPDK c++ issue
-#define UINT8_MAX 255
-#define UINT16_MAX 0xFFFF
-// DPDK c++ issue
-#endif
+void CDpOneStream::Delete(CFlowGenListPerThread * core){
+ assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
+ core->free_node((CGenNode *)m_node);
+ delete m_dp_stream;
+ m_node=0;
+ m_dp_stream=0;
+}
-#include <rte_ethdev.h>
-#include "mbuf.h"
+void CDpOneStream::DeleteOnlyStream(){
+ assert(m_dp_stream);
+ delete m_dp_stream;
+ m_dp_stream=0;
+}
-/**
- * TEST
- *
- */
-static const uint8_t udp_pkt[]={
- 0x00,0x00,0x00,0x01,0x00,0x00,
- 0x00,0x00,0x00,0x01,0x00,0x00,
- 0x08,0x00,
-
- 0x45,0x00,0x00,0x81,
- 0xaf,0x7e,0x00,0x00,
- 0x12,0x11,0xd9,0x23,
- 0x01,0x01,0x01,0x01,
- 0x3d,0xad,0x72,0x1b,
-
- 0x11,0x11,
- 0x11,0x11,
-
- 0x00,0x6d,
- 0x00,0x00,
-
- 0x64,0x31,0x3a,0x61,
- 0x64,0x32,0x3a,0x69,0x64,
- 0x32,0x30,0x3a,0xd0,0x0e,
- 0xa1,0x4b,0x7b,0xbd,0xbd,
- 0x16,0xc6,0xdb,0xc4,0xbb,0x43,
- 0xf9,0x4b,0x51,0x68,0x33,0x72,
- 0x20,0x39,0x3a,0x69,0x6e,0x66,0x6f,
- 0x5f,0x68,0x61,0x73,0x68,0x32,0x30,0x3a,0xee,0xc6,0xa3,
- 0xd3,0x13,0xa8,0x43,0x06,0x03,0xd8,0x9e,0x3f,0x67,0x6f,
- 0xe7,0x0a,0xfd,0x18,0x13,0x8d,0x65,0x31,0x3a,0x71,0x39,
- 0x3a,0x67,0x65,0x74,0x5f,0x70,0x65,0x65,0x72,0x73,0x31,
- 0x3a,0x74,0x38,0x3a,0x3d,0xeb,0x0c,0xbf,0x0d,0x6a,0x0d,
- 0xa5,0x31,0x3a,0x79,0x31,0x3a,0x71,0x65,0x87,0xa6,0x7d,
- 0xe7
-};
-
-static int
-test_inject_pkt(uint8_t *pkt, uint32_t pkt_size) {
-
- #ifndef TREX_RPC_MOCK_SERVER
- rte_mempool_t * mp= CGlobalInfo::m_mem_pool[0].m_big_mbuf_pool ;
- #else
- rte_mempool_t * mp = NULL;
- #endif
-
- rte_mbuf_t *m = rte_pktmbuf_alloc(mp);
- if ( unlikely(m==0) ) {
- printf("ERROR no packets \n");
- return (-1);
+int CGenNodeStateless::get_stream_id(){
+ if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
+ return (-1); // not valid
}
- char *p = rte_pktmbuf_append(m, pkt_size);
- assert(p);
- /* set pkt data */
- memcpy(p,pkt,pkt_size);
+ assert(m_ref_stream_info);
+ return ((int)m_ref_stream_info->m_stream_id);
+}
+
+
+void CGenNodeStateless::DumpHeader(FILE *fd){
+ fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
+
+}
+void CGenNodeStateless::Dump(FILE *fd){
+ fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu \n",
+ m_time,
+ (ulong)m_port_id,
+ "s-pkt", //action
+ get_stream_state_str(m_state ).c_str(),
+ get_stream_id(), //stream_id
+ TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
+ (ulong)m_multi_bursts,
+ (ulong)m_single_burst
+ );
+}
+
+
+void CGenNodeStateless::refresh(){
+
+ /* refill the stream info */
+ m_single_burst = m_single_burst_refill;
+ m_multi_bursts = m_ref_stream_info->m_num_bursts;
+ m_state = CGenNodeStateless::ss_ACTIVE;
+}
+
+
+void CGenNodeCommand::free_command(){
+
+ assert(m_cmd);
+ m_cmd->on_node_remove();
+ delete m_cmd;
+}
+
+
+std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
+ std::string res;
+
+ switch (stream_state) {
+ case CGenNodeStateless::ss_FREE_RESUSE :
+ res="FREE ";
+ break;
+ case CGenNodeStateless::ss_INACTIVE :
+ res="INACTIVE ";
+ break;
+ case CGenNodeStateless::ss_ACTIVE :
+ res="ACTIVE ";
+ break;
+ default:
+ res="Unknow ";
+ };
+ return(res);
+}
+
+
+void CGenNodeStateless::free_stl_node(){
+ /* if we have cache mbuf free it */
+ rte_mbuf_t * m=get_cache_mbuf();
+ if (m) {
+ rte_pktmbuf_free(m);
+ m_cache_mbuf=0;
+ }
+}
+
+
+bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
+ m_active_streams-=d; /* reduce the number of streams */
+ if (m_active_streams == 0) {
+ return (true);
+ }
+ return (false);
+}
+
+bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
+
+ /* we are working with continues streams so we must be in transmit mode */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node =dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
+ assert(node->is_pause() == true);
+ node->set_pause(false);
+ }
+ m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
+ return (true);
+}
+
+bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
+
+ assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
+ (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node = dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
- rte_mbuf_t *tx_pkts[32];
- tx_pkts[0] = m;
- uint8_t nb_pkts = 1;
- uint16_t ret = rte_eth_tx_burst(0, 0, tx_pkts, nb_pkts);
- (void)ret;
- rte_pktmbuf_free(m);
+ node->update_rate(factor);
+ }
- return (0);
+ return (true);
}
-static int
-test_inject_udp_pkt(){
- return (test_inject_pkt((uint8_t*)udp_pkt,sizeof(udp_pkt)));
+bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
+
+ /* we are working with continues streams so we must be in transmit mode */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node =dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
+ assert(node->is_pause() == false);
+ node->set_pause(true);
+ }
+ m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
+ return (true);
}
+
+bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id){
+
+
+ if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
+ assert(m_active_streams==0);
+ return false;
+ }
+
+ /* there could be race of stop after stop */
+ if ( stop_on_id ) {
+ if (event_id != m_event_id){
+ /* we can't stop it is an old message */
+ return false;
+ }
+ }
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node =dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
+ if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
+ node->mark_for_free();
+ m_active_streams--;
+ dp_stream.DeleteOnlyStream();
+
+ }else{
+ dp_stream.Delete(m_core);
+ }
+ }
+
+ /* active stream should be zero */
+ assert(m_active_streams==0);
+ m_active_nodes.clear();
+ m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
+ return (true);
+}
+
+
+void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){
+ m_core=core;
+ m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
+ m_port_id=0;
+ m_active_streams=0;
+ m_active_nodes.clear();
+}
+
+
+
void
-TrexStatelessDpCore::test_inject_dummy_pkt() {
- test_inject_udp_pkt();
+TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
+ m_thread_id = thread_id;
+ m_core = core;
+ m_local_port_offset = 2*core->getDualPortId();
+
+ CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
+
+ m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
+ m_ring_to_cp = cp_dp->getRingDpToCp(thread_id);
+
+ m_state = STATE_IDLE;
+
+ int i;
+ for (i=0; i<NUM_PORTS_PER_CORE; i++) {
+ m_ports[i].create(core);
+ }
+}
+
+
+/* move to the next stream, old stream move to INACTIVE */
+bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node){
+
+ assert(cur_node);
+ TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
+ bool schedule =false;
+
+ bool to_stop_port=false;
+
+ if (next_node == NULL) {
+ /* there is no next stream , reduce the number of active streams*/
+ to_stop_port = lp_port->update_number_of_active_streams(1);
+
+ }else{
+ uint8_t state=next_node->get_state();
+
+ /* can't be FREE_RESUSE */
+ assert(state != CGenNodeStateless::ss_FREE_RESUSE);
+ if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
+
+ /* refill start info and scedule, no update in active streams */
+ next_node->refresh();
+ schedule = true;
+
+ }else{
+ to_stop_port = lp_port->update_number_of_active_streams(1);
+ }
+ }
+
+ if ( to_stop_port ) {
+ /* call stop port explictly to move the state */
+ stop_traffic(cur_node->m_port_id,false,0);
+ }
+
+ return ( schedule );
}
-/***************************
- * DP core
+
+
+/**
+ * in idle state loop, the processor most of the time sleeps
+ * and periodically checks for messages
*
- **************************/
-TrexStatelessDpCore::TrexStatelessDpCore(uint8_t core_id) : m_core_id(core_id) {
+ * @author imarom (01-Nov-15)
+ */
+void
+TrexStatelessDpCore::idle_state_loop() {
+
+ while (m_state == STATE_IDLE) {
+ periodic_check_for_cp_messages();
+ delay(200);
+ }
+}
+
+
+
+void TrexStatelessDpCore::quit_main_loop(){
+ m_core->set_terminate_mode(true); /* mark it as terminated */
+ m_state = STATE_TERMINATE;
+ add_global_duration(0.0001);
}
+
/**
- * main function for DP core
+ * scehduler runs when traffic exists
+ * it will return when no more transmitting is done on this
+ * core
*
+ * @author imarom (01-Nov-15)
*/
void
-TrexStatelessDpCore::run() {
- printf("\nOn DP core %d\n", m_core_id);
+TrexStatelessDpCore::start_scheduler() {
+ /* creates a maintenace job using the scheduler */
+ CGenNode * node_sync = m_core->create_node() ;
+ node_sync->m_type = CGenNode::FLOW_SYNC;
+ node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
+ m_core->m_node_gen.add_node(node_sync);
+
+ double old_offset = 0.0;
+ m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
+ /* bail out in case of terminate */
+ if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
+ m_core->m_node_gen.close_file(m_core);
+ m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */
+ }
+}
+
+
+void
+TrexStatelessDpCore::run_once(){
+
+ idle_state_loop();
+
+ if ( m_state == STATE_TERMINATE ){
+ return;
+ }
+
+ start_scheduler();
+}
+
+
+
+
+void
+TrexStatelessDpCore::start() {
+
while (true) {
- test_inject_dummy_pkt();
- rte_pause();
+ run_once();
+
+ if ( m_core->is_terminated_by_master() ) {
+ break;
+ }
+ }
+}
+
+/* only if both port are idle we can exit */
+void
+TrexStatelessDpCore::schedule_exit(){
+
+ CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
+
+ node->m_type = CGenNode::COMMAND;
+
+ node->m_cmd = new TrexStatelessDpCanQuit();
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = m_core->m_cur_time_sec ;
+
+ m_core->m_node_gen.add_node((CGenNode *)node);
+}
+
+
+void
+TrexStatelessDpCore::add_global_duration(double duration){
+ if (duration > 0.0) {
+ CGenNode *node = m_core->create_node() ;
+
+ node->m_type = CGenNode::EXIT_SCHED;
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = m_core->m_cur_time_sec + duration ;
+
+ m_core->m_node_gen.add_node(node);
+ }
+}
+
+/* add per port exit */
+void
+TrexStatelessDpCore::add_port_duration(double duration,
+ uint8_t port_id,
+ int event_id){
+ if (duration > 0.0) {
+ CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
+
+ node->m_type = CGenNode::COMMAND;
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = m_core->m_cur_time_sec + duration ;
+
+ TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
+
+
+ /* test this */
+ m_core->m_non_active_nodes++;
+ cmd->set_core_ptr(m_core);
+ cmd->set_event_id(event_id);
+ cmd->set_wait_for_event_id(true);
+
+ node->m_cmd = cmd;
+
+ m_core->m_node_gen.add_node((CGenNode *)node);
+ }
+}
+
+
+void
+TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
+ TrexStream * stream,
+ TrexStreamsCompiledObj *comp) {
+
+ CGenNodeStateless *node = m_core->create_node_sl();
+
+ /* add periodic */
+ node->m_type = CGenNode::STATELESS_PKT;
+
+ node->m_ref_stream_info = stream->clone_as_dp();
+
+ node->m_next_stream=0; /* will be fixed later */
+
+
+ if ( stream->m_self_start ){
+ /* if self start it is in active mode */
+ node->m_state =CGenNodeStateless::ss_ACTIVE;
+ lp_port->m_active_streams++;
+ }else{
+ node->m_state =CGenNodeStateless::ss_INACTIVE;
}
+
+ node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
+
+ pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
+ node->m_flags = 0;
+
+ /* set socket id */
+ node->set_socket_id(m_core->m_node_gen.m_socket_id);
+
+ /* build a mbuf from a packet */
+
+ uint16_t pkt_size = stream->m_pkt.len;
+ const uint8_t *stream_pkt = stream->m_pkt.binary;
+
+ node->m_pause =0;
+ node->m_stream_type = stream->m_type;
+ node->m_next_time_offset = 1.0 / stream->get_pps();
+
+ /* stateless specific fields */
+ switch ( stream->m_type ) {
+
+ case TrexStream::stCONTINUOUS :
+ node->m_single_burst=0;
+ node->m_single_burst_refill=0;
+ node->m_multi_bursts=0;
+ node->m_ibg_sec = 0.0;
+ break;
+
+ case TrexStream::stSINGLE_BURST :
+ node->m_stream_type = TrexStream::stMULTI_BURST;
+ node->m_single_burst = stream->m_burst_total_pkts;
+ node->m_single_burst_refill = stream->m_burst_total_pkts;
+ node->m_multi_bursts = 1; /* single burst in multi burst of 1 */
+ node->m_ibg_sec = 0.0;
+ break;
+
+ case TrexStream::stMULTI_BURST :
+ node->m_single_burst = stream->m_burst_total_pkts;
+ node->m_single_burst_refill = stream->m_burst_total_pkts;
+ node->m_multi_bursts = stream->m_num_bursts;
+ node->m_ibg_sec = usec_to_sec( stream->m_ibg_usec );
+ break;
+ default:
+
+ assert(0);
+ };
+
+ node->m_port_id = stream->m_port_id;
+
+ /* allocate const mbuf */
+ rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
+ assert(m);
+
+ char *p = rte_pktmbuf_append(m, pkt_size);
+ assert(p);
+ /* copy the packet */
+ memcpy(p,stream_pkt,pkt_size);
+
+ /* set dir 0 or 1 client or server */
+ node->set_mbuf_cache_dir(dir);
+
+ /* TBD repace the mac if req we should add flag */
+ m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
+
+ /* set the packet as a readonly */
+ node->set_cache_mbuf(m);
+
+ CDpOneStream one_stream;
+
+ one_stream.m_dp_stream = node->m_ref_stream_info;
+ one_stream.m_node =node;
+
+ lp_port->m_active_nodes.push_back(one_stream);
+
+ /* schedule only if active */
+ if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
+ m_core->m_node_gen.add_node((CGenNode *)node);
+ }
+}
+
+void
+TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
+ double duration,
+ int event_id) {
+
+
+ TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
+ lp_port->m_active_streams = 0;
+ lp_port->set_event_id(event_id);
+
+ /* no nodes in the list */
+ assert(lp_port->m_active_nodes.size()==0);
+
+ for (auto single_stream : obj->get_objects()) {
+ /* all commands should be for the same port */
+ assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
+ add_stream(lp_port,single_stream.m_stream,obj);
+ }
+
+ uint32_t nodes = lp_port->m_active_nodes.size();
+ /* find next stream */
+ assert(nodes == obj->get_objects().size());
+
+ int cnt=0;
+
+ /* set the next_stream pointer */
+ for (auto single_stream : obj->get_objects()) {
+
+ if (single_stream.m_stream->is_dp_next_stream() ) {
+ int stream_id = single_stream.m_stream->m_next_stream_id;
+ assert(stream_id<nodes);
+ /* point to the next stream , stream_id is fixed */
+ lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
+ }
+ cnt++;
+ }
+
+ lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
+ m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
+
+
+ if ( duration > 0.0 ){
+ add_port_duration( duration ,obj->get_port_id(),event_id );
+ }
+
+}
+
+
+bool TrexStatelessDpCore::are_all_ports_idle(){
+
+ bool res=true;
+ int i;
+ for (i=0; i<NUM_PORTS_PER_CORE; i++) {
+ if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
+ res=false;
+ }
+ }
+ return (res);
+}
+
+
+void
+TrexStatelessDpCore::resume_traffic(uint8_t port_id){
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ lp_port->resume_traffic(port_id);
+}
+
+
+void
+TrexStatelessDpCore::pause_traffic(uint8_t port_id){
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ lp_port->pause_traffic(port_id);
+}
+
+void
+TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ lp_port->update_traffic(port_id, factor);
+}
+
+
+void
+TrexStatelessDpCore::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id) {
+ /* we cannot remove nodes not from the top of the queue so
+ for every active node - make sure next time
+ the scheduler invokes it, it will be free */
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
+ /* nothing to do ! already stopped */
+ //printf(" skip .. %f\n",m_core->m_cur_time_sec);
+ return;
+ }
+
+#if 0
+ if ( are_all_ports_idle() ) {
+ /* just a place holder if we will need to do somthing in that case */
+ }
+#endif
+
+ /* inform the control plane we stopped - this might be a async stop
+ (streams ended)
+ */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
+ TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
+ port_id,
+ TrexDpPortEvent::EVENT_STOP,
+ lp_port->get_event_id());
+ ring->Enqueue((CGenNode *)event_msg);
+
+}
+
+/**
+ * handle a message from CP to DP
+ *
+ */
+void
+TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
+ msg->handle(this);
+ delete msg;
}
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 4b09b752..7dc4a2b2 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -21,23 +22,262 @@ limitations under the License.
#ifndef __TREX_STATELESS_DP_CORE_H__
#define __TREX_STATELESS_DP_CORE_H__
-#include <stdint.h>
+#include <vector>
+
+#include <msg_manager.h>
+#include <pal_utl.h>
+
+class TrexStatelessCpToDpMsgBase;
+class TrexStatelessDpStart;
+class CFlowGenListPerThread;
+class CGenNodeStateless;
+class TrexStreamsCompiledObj;
+class TrexStream;
+
+
+class CDpOneStream {
+public:
+ void Create(){
+ }
+
+ void Delete(CFlowGenListPerThread * core);
+ void DeleteOnlyStream();
+
+ CGenNodeStateless * m_node; // schedule node
+ TrexStream * m_dp_stream; // stream info
+};
+
+class TrexStatelessDpPerPort {
+
+public:
+ /* states */
+ enum state_e {
+ ppSTATE_IDLE,
+ ppSTATE_TRANSMITTING,
+ ppSTATE_PAUSE
+
+ };
+
+public:
+ TrexStatelessDpPerPort(){
+ }
+
+ void create(CFlowGenListPerThread * core);
+
+ bool pause_traffic(uint8_t port_id);
+
+ bool resume_traffic(uint8_t port_id);
+
+ bool update_traffic(uint8_t port_id, double factor);
+
+ bool stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id);
+
+ bool update_number_of_active_streams(uint32_t d);
+
+ state_e get_state() {
+ return m_state;
+ }
+
+ void set_event_id(int event_id) {
+ m_event_id = event_id;
+ }
+
+ int get_event_id() {
+ return m_event_id;
+ }
+
+public:
+
+ state_e m_state;
+ uint8_t m_port_id;
+
+ uint32_t m_active_streams; /* how many active streams on this port */
+
+ std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */
+ CFlowGenListPerThread * m_core ;
+ int m_event_id;
+};
+
+/* for now */
+#define NUM_PORTS_PER_CORE 2
-/**
- * stateless DP core object
- *
- */
class TrexStatelessDpCore {
+
public:
- TrexStatelessDpCore(uint8_t core_id);
+ /* states */
+ enum state_e {
+ STATE_IDLE,
+ STATE_TRANSMITTING,
+ STATE_TERMINATE
+
+ };
+
+ TrexStatelessDpCore() {
+ m_thread_id = 0;
+ m_core = NULL;
+ m_duration = -1;
+ }
+
+ /**
+ * "static constructor"
+ *
+ * @param thread_id
+ * @param core
+ */
+ void create(uint8_t thread_id, CFlowGenListPerThread *core);
+
+ /**
+ * launch the stateless DP core code
+ *
+ */
+ void start();
+
+
+ /* exit after batch of commands */
+ void run_once();
+
+ /**
+ * dummy traffic creator
+ *
+ * @author imarom (27-Oct-15)
+ *
+ * @param pkt
+ * @param pkt_len
+ */
+ void start_traffic(TrexStreamsCompiledObj *obj,
+ double duration,
+ int m_event_id);
+
+
+ /* pause the streams, work only if all are continues */
+ void pause_traffic(uint8_t port_id);
+
+
+
+ void resume_traffic(uint8_t port_id);
+
+
+ /**
+ * update current traffic rate
+ *
+ * @author imarom (25-Nov-15)
+ *
+ */
+ void update_traffic(uint8_t port_id, double mul);
+
+ /**
+ *
+ * stop all traffic for this core
+ *
+ */
+ void stop_traffic(uint8_t port_id,bool stop_on_id, int event_id);
+
+
+ /* return if all ports are idel */
+ bool are_all_ports_idle();
+
+
+ /**
+ * check for and handle messages from CP
+ *
+ * @author imarom (27-Oct-15)
+ */
+ void periodic_check_for_cp_messages() {
+ // doing this inline for performance reasons
+
+ /* fast path */
+ if ( likely ( m_ring_from_cp->isEmpty() ) ) {
+ return;
+ }
+
+ while ( true ) {
+ CGenNode * node = NULL;
+ if (m_ring_from_cp->Dequeue(node) != 0) {
+ break;
+ }
+ assert(node);
+
+ TrexStatelessCpToDpMsgBase * msg = (TrexStatelessCpToDpMsgBase *)node;
+ handle_cp_msg(msg);
+ }
+
+ }
+
+ /* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */
+ void quit_main_loop();
+
+ state_e get_state() {
+ return m_state;
+ }
+
+ bool set_stateless_next_node(CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node);
+
+
+ TrexStatelessDpPerPort * get_port_db(uint8_t port_id){
+ assert((m_local_port_offset==port_id) ||(m_local_port_offset+1==port_id));
+ uint8_t local_port_id = port_id -m_local_port_offset;
+ assert(local_port_id<NUM_PORTS_PER_CORE);
+ return (&m_ports[local_port_id]);
+ }
+
- /* starts the DP core run */
- void run();
private:
- void test_inject_dummy_pkt();
- uint8_t m_core_id;
+
+ void schedule_exit();
+
+
+ /**
+ * in idle state loop, the processor most of the time sleeps
+ * and periodically checks for messages
+ *
+ */
+ void idle_state_loop();
+
+ /**
+ * real job is done when scheduler is launched
+ *
+ */
+ void start_scheduler();
+
+ /**
+ * handles a CP to DP message
+ *
+ * @author imarom (27-Oct-15)
+ *
+ * @param msg
+ */
+ void handle_cp_msg(TrexStatelessCpToDpMsgBase *msg);
+
+
+ void add_port_duration(double duration,
+ uint8_t port_id,
+ int event_id);
+
+ void add_global_duration(double duration);
+
+ void add_stream(TrexStatelessDpPerPort * lp_port,
+ TrexStream * stream,
+ TrexStreamsCompiledObj *comp);
+
+ uint8_t m_thread_id;
+ uint8_t m_local_port_offset;
+
+ state_e m_state; /* state of all ports */
+ CNodeRing *m_ring_from_cp;
+ CNodeRing *m_ring_to_cp;
+
+ TrexStatelessDpPerPort m_ports[NUM_PORTS_PER_CORE];
+
+ /* pointer to the main object */
+ CFlowGenListPerThread * m_core;
+
+ double m_duration;
};
#endif /* __TREX_STATELESS_DP_CORE_H__ */
+
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
new file mode 100644
index 00000000..111af845
--- /dev/null
+++ b/src/stateless/dp/trex_stream_node.h
@@ -0,0 +1,284 @@
+/*
+ Itay Marom
+ Hanoh Haim
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_STREAM_NODE_H__
+#define __TREX_STREAM_NODE_H__
+
+#include <bp_sim.h>
+#include <stdio.h>
+
+class TrexStatelessDpCore;
+#include <trex_stream.h>
+
+class TrexStatelessCpToDpMsgBase;
+class CFlowGenListPerThread;
+
+struct CGenNodeCommand : public CGenNodeBase {
+
+friend class TrexStatelessDpCore;
+
+public:
+ TrexStatelessCpToDpMsgBase * m_cmd;
+
+ uint8_t m_pad_end[104];
+
+public:
+ void free_command();
+
+} __rte_cache_aligned;;
+
+
+static_assert(sizeof(CGenNodeCommand) == sizeof(CGenNode), "sizeof(CGenNodeCommand) != sizeof(CGenNode)" );
+
+
+/* this is a event for stateless */
+struct CGenNodeStateless : public CGenNodeBase {
+friend class TrexStatelessDpCore;
+
+public:
+ enum {
+ ss_FREE_RESUSE =1, /* should be free by scheduler */
+ ss_INACTIVE =2, /* will be active by other stream or stopped */
+ ss_ACTIVE =3 /* the stream is active */
+ };
+ typedef uint8_t stream_state_t ;
+
+ static std::string get_stream_state_str(stream_state_t stream_state);
+
+private:
+ /* cache line 0 */
+ /* important stuff here */
+ void * m_cache_mbuf;
+
+ double m_next_time_offset; /* in sec */
+ double m_ibg_sec; /* inter burst time in sec */
+
+
+ stream_state_t m_state;
+ uint8_t m_port_id;
+ uint8_t m_stream_type; /* see TrexStream::STREAM_TYPE ,stream_type_t */
+ uint8_t m_pause;
+
+ uint32_t m_single_burst; /* the number of bursts in case of burst */
+ uint32_t m_single_burst_refill;
+
+ uint32_t m_multi_bursts; /* in case of multi_burst how many bursts */
+
+ /* cache line 1 */
+ TrexStream * m_ref_stream_info; /* the stream info */
+ CGenNodeStateless * m_next_stream;
+
+ /* pad to match the size of CGenNode */
+ uint8_t m_pad_end[56];
+
+
+
+
+public:
+
+ uint8_t get_port_id(){
+ return (m_port_id);
+ }
+
+
+ /**
+ * calculate the time offset based
+ * on the PPS and multiplier
+ *
+ */
+ void update_rate(double factor) {
+ /* update the inter packet gap */
+ m_next_time_offset = m_next_time_offset / factor;
+ }
+
+ /* we restart the stream, schedule it using stream isg */
+ inline void update_refresh_time(double cur_time){
+ m_time = cur_time + usec_to_sec(m_ref_stream_info->m_isg_usec);
+ }
+
+ inline bool is_mask_for_free(){
+ return (get_state() == CGenNodeStateless::ss_FREE_RESUSE ?true:false);
+
+ }
+ inline void mark_for_free(){
+ set_state(CGenNodeStateless::ss_FREE_RESUSE);
+ /* only to be safe */
+ m_ref_stream_info= NULL;
+ m_next_stream= NULL;
+ }
+
+ bool is_pause(){
+ return (m_pause==1?true:false);
+ }
+
+ void set_pause(bool enable){
+ if ( enable ){
+ m_pause=1;
+ }else{
+ m_pause=0;
+ }
+ }
+
+ inline uint8_t get_stream_type(){
+ return (m_stream_type);
+ }
+
+ inline uint32_t get_single_burst_cnt(){
+ return (m_single_burst);
+ }
+
+ inline double get_multi_ibg_sec(){
+ return (m_ibg_sec);
+ }
+
+ inline uint32_t get_multi_burst_cnt(){
+ return (m_multi_bursts);
+ }
+
+ inline void set_state(stream_state_t new_state){
+ m_state=new_state;
+ }
+
+
+ inline stream_state_t get_state() {
+ return m_state;
+ }
+
+ void refresh();
+
+ inline void handle_continues(CFlowGenListPerThread *thread) {
+
+ if (unlikely (is_pause()==false)) {
+ thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+ }
+
+ /* in case of continues */
+ m_time += m_next_time_offset;
+
+ /* insert a new event */
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
+ }
+
+ inline void handle_multi_burst(CFlowGenListPerThread *thread) {
+ thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+
+ m_single_burst--;
+ if (m_single_burst > 0 ) {
+ /* in case of continues */
+ m_time += m_next_time_offset;
+
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
+ }else{
+ m_multi_bursts--;
+ if ( m_multi_bursts == 0 ) {
+ set_state(CGenNodeStateless::ss_INACTIVE);
+ if ( thread->set_stateless_next_node(this,m_next_stream) ){
+ /* update the next stream time using isg */
+ m_next_stream->update_refresh_time(m_time);
+
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)m_next_stream);
+ }else{
+ // in case of zero we will schedule a command to stop
+ // will be called from set_stateless_next_node
+ }
+
+ }else{
+ m_time += m_ibg_sec;
+ m_single_burst = m_single_burst_refill;
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
+ }
+ }
+ }
+
+
+ /**
+ * main function to handle an event of a packet tx
+ *
+ *
+ *
+ */
+
+ inline void handle(CFlowGenListPerThread *thread) {
+
+ if (m_stream_type == TrexStream::stCONTINUOUS ) {
+ handle_continues(thread) ;
+ }else{
+ if (m_stream_type == TrexStream::stMULTI_BURST) {
+ handle_multi_burst(thread);
+ }else{
+ assert(0);
+ }
+ }
+
+ }
+
+ void set_socket_id(socket_id_t socket){
+ m_socket_id=socket;
+ }
+
+ socket_id_t get_socket_id(){
+ return ( m_socket_id );
+ }
+
+ inline void set_mbuf_cache_dir(pkt_dir_t dir){
+ if (dir) {
+ m_flags |=NODE_FLAGS_DIR;
+ }else{
+ m_flags &=~NODE_FLAGS_DIR;
+ }
+ }
+
+ inline pkt_dir_t get_mbuf_cache_dir(){
+ return ((pkt_dir_t)( m_flags &1));
+ }
+
+ inline void set_cache_mbuf(rte_mbuf_t * m){
+ m_cache_mbuf=(void *)m;
+ m_flags |= NODE_FLAGS_MBUF_CACHE;
+ }
+
+ inline rte_mbuf_t * get_cache_mbuf(){
+ if ( m_flags &NODE_FLAGS_MBUF_CACHE ) {
+ return ((rte_mbuf_t *)m_cache_mbuf);
+ }else{
+ return ((rte_mbuf_t *)0);
+ }
+ }
+
+ void free_stl_node();
+
+public:
+ /* debug functions */
+
+ int get_stream_id();
+
+ static void DumpHeader(FILE *fd);
+
+ void Dump(FILE *fd);
+
+} __rte_cache_aligned;
+
+static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)" );
+
+
+
+
+#endif /* __TREX_STREAM_NODE_H__ */
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
new file mode 100644
index 00000000..257de168
--- /dev/null
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -0,0 +1,191 @@
+/*
+ Itay Marom
+ Hanoch Haim
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#include <trex_stateless_messaging.h>
+#include <trex_stateless_dp_core.h>
+#include <trex_streams_compiler.h>
+#include <trex_stateless.h>
+#include <bp_sim.h>
+
+#include <string.h>
+
+/*************************
+ start traffic message
+ ************************/
+TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexStreamsCompiledObj *obj, double duration) {
+ m_port_id = port_id;
+ m_event_id = event_id;
+ m_obj = obj;
+ m_duration = duration;
+}
+
+
+/**
+ * clone for DP start message
+ *
+ */
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpStart::clone() {
+
+ TrexStreamsCompiledObj *new_obj = m_obj->clone();
+
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStart(m_port_id, m_event_id, new_obj, m_duration);
+
+ return new_msg;
+}
+
+TrexStatelessDpStart::~TrexStatelessDpStart() {
+ if (m_obj) {
+ delete m_obj;
+ }
+}
+
+bool
+TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
+
+ /* staet traffic */
+ dp_core->start_traffic(m_obj, m_duration,m_event_id);
+
+ return true;
+}
+
+/*************************
+ stop traffic message
+ ************************/
+bool
+TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) {
+
+
+ dp_core->stop_traffic(m_port_id,m_stop_only_for_event_id,m_event_id);
+ return true;
+}
+
+
+void TrexStatelessDpStop::on_node_remove(){
+ if ( m_core ) {
+ assert(m_core->m_non_active_nodes>0);
+ m_core->m_non_active_nodes--;
+ }
+}
+
+
+TrexStatelessCpToDpMsgBase * TrexStatelessDpPause::clone(){
+
+ TrexStatelessDpPause *new_msg = new TrexStatelessDpPause(m_port_id);
+ return new_msg;
+}
+
+
+bool TrexStatelessDpPause::handle(TrexStatelessDpCore *dp_core){
+ dp_core->pause_traffic(m_port_id);
+ return (true);
+}
+
+
+
+TrexStatelessCpToDpMsgBase * TrexStatelessDpResume::clone(){
+ TrexStatelessDpResume *new_msg = new TrexStatelessDpResume(m_port_id);
+ return new_msg;
+}
+
+bool TrexStatelessDpResume::handle(TrexStatelessDpCore *dp_core){
+ dp_core->resume_traffic(m_port_id);
+ return (true);
+}
+
+
+/**
+ * clone for DP stop message
+ *
+ */
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpStop::clone() {
+ TrexStatelessDpStop *new_msg = new TrexStatelessDpStop(m_port_id);
+
+ new_msg->set_event_id(m_event_id);
+ new_msg->set_wait_for_event_id(m_stop_only_for_event_id);
+ /* set back pointer to master */
+ new_msg->set_core_ptr(m_core);
+
+ return new_msg;
+}
+
+
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpQuit::clone(){
+
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpQuit();
+
+ return new_msg;
+}
+
+
+bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){
+
+ /* quit */
+ dp_core->quit_main_loop();
+ return (true);
+}
+
+bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){
+
+ if ( dp_core->are_all_ports_idle() ){
+ /* if all ports are idle quit now */
+ set_quit(true);
+ }
+ return (true);
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpCanQuit::clone(){
+
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpCanQuit();
+
+ return new_msg;
+}
+
+/*************************
+ update traffic message
+ ************************/
+bool
+TrexStatelessDpUpdate::handle(TrexStatelessDpCore *dp_core) {
+ dp_core->update_traffic(m_port_id, m_factor);
+
+ return true;
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpUpdate::clone() {
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpUpdate(m_port_id, m_factor);
+
+ return new_msg;
+}
+
+/************************* messages from DP to CP **********************/
+bool
+TrexDpPortEventMsg::handle() {
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id);
+ port->get_dp_events().handle_event(m_event_type, m_thread_id, m_event_id);
+
+ return (true);
+}
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
new file mode 100644
index 00000000..d56596bf
--- /dev/null
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -0,0 +1,319 @@
+/*
+ Itay Marom
+ Hanoch Haim
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_STATELESS_MESSAGING_H__
+#define __TREX_STATELESS_MESSAGING_H__
+
+#include <msg_manager.h>
+#include <trex_dp_port_events.h>
+
+class TrexStatelessDpCore;
+class TrexStreamsCompiledObj;
+class CFlowGenListPerThread;
+
+/**
+ * defines the base class for CP to DP messages
+ *
+ * @author imarom (27-Oct-15)
+ */
+class TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessCpToDpMsgBase() {
+ m_quit_scheduler=false;
+ }
+
+ virtual ~TrexStatelessCpToDpMsgBase() {
+ }
+
+
+ virtual bool handle(TrexStatelessDpCore *dp_core) = 0;
+
+ /**
+ * clone the current message
+ *
+ */
+ virtual TrexStatelessCpToDpMsgBase * clone() = 0;
+
+ /* do we want to quit scheduler, can be set by handle function */
+ void set_quit(bool enable){
+ m_quit_scheduler=enable;
+ }
+
+ bool is_quit(){
+ return ( m_quit_scheduler);
+ }
+
+ /* this node is called from scheduler in case the node is free */
+ virtual void on_node_remove(){
+ }
+
+ /* no copy constructor */
+ TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete;
+
+protected:
+ int m_event_id;
+ bool m_quit_scheduler;
+};
+
+/**
+ * a message to start traffic
+ *
+ * @author imarom (27-Oct-15)
+ */
+class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpStart(uint8_t m_port_id, int m_event_id, TrexStreamsCompiledObj *obj, double duration);
+
+ ~TrexStatelessDpStart();
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+private:
+
+ uint8_t m_port_id;
+ int m_event_id;
+ TrexStreamsCompiledObj *m_obj;
+ double m_duration;
+
+};
+
+class TrexStatelessDpPause : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpPause(uint8_t port_id) : m_port_id(port_id) {
+ }
+
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+
+private:
+ uint8_t m_port_id;
+};
+
+
+class TrexStatelessDpResume : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpResume(uint8_t port_id) : m_port_id(port_id) {
+ }
+
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+
+private:
+ uint8_t m_port_id;
+};
+
+
+/**
+ * a message to stop traffic
+ *
+ * @author imarom (27-Oct-15)
+ */
+class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) {
+ m_stop_only_for_event_id=false;
+ m_event_id=0;
+ m_core = NULL;
+ }
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ void set_core_ptr(CFlowGenListPerThread * core){
+ m_core = core;
+ }
+
+ CFlowGenListPerThread * get_core_ptr(){
+ return ( m_core);
+ }
+
+
+ void set_event_id(int event_id){
+ m_event_id = event_id;
+ }
+
+ void set_wait_for_event_id(bool wait){
+ m_stop_only_for_event_id = wait;
+ }
+
+ virtual void on_node_remove();
+
+
+ bool get_is_stop_by_event_id(){
+ return (m_stop_only_for_event_id);
+ }
+
+ int get_event_id(){
+ return (m_event_id);
+ }
+
+private:
+ uint8_t m_port_id;
+ bool m_stop_only_for_event_id;
+ int m_event_id;
+ CFlowGenListPerThread * m_core ;
+
+};
+
+/**
+ * a message to Quit the datapath traffic. support only stateless for now
+ *
+ * @author hhaim
+ */
+class TrexStatelessDpQuit : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpQuit() {
+ }
+
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+};
+
+/**
+ * a message to check if both port are idel and exit
+ *
+ * @author hhaim
+ */
+class TrexStatelessDpCanQuit : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpCanQuit() {
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+};
+
+
+/**
+ * update message
+ */
+class TrexStatelessDpUpdate : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpUpdate(uint8_t port_id, double factor) {
+ m_port_id = port_id;
+ m_factor = factor;
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+private:
+ uint8_t m_port_id;
+ double m_factor;
+};
+
+
+/************************* messages from DP to CP **********************/
+
+/**
+ * defines the base class for CP to DP messages
+ *
+ * @author imarom (27-Oct-15)
+ */
+class TrexStatelessDpToCpMsgBase {
+public:
+
+ TrexStatelessDpToCpMsgBase() {
+ }
+
+ virtual ~TrexStatelessDpToCpMsgBase() {
+ }
+
+ /**
+ * virtual function to handle a message
+ *
+ */
+ virtual bool handle() = 0;
+
+ /* no copy constructor */
+ TrexStatelessDpToCpMsgBase(TrexStatelessDpToCpMsgBase &) = delete;
+
+};
+
+
+/**
+ * a message indicating an event has happened on a port at the
+ * DP
+ *
+ */
+class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
+public:
+
+ TrexDpPortEventMsg(int thread_id, uint8_t port_id, TrexDpPortEvent::event_e type, int event_id) {
+ m_thread_id = thread_id;
+ m_port_id = port_id;
+ m_event_type = type;
+ m_event_id = event_id;
+ }
+
+ virtual bool handle();
+
+ int get_thread_id() {
+ return m_thread_id;
+ }
+
+ uint8_t get_port_id() {
+ return m_port_id;
+ }
+
+ TrexDpPortEvent::event_e get_event_type() {
+ return m_event_type;
+ }
+
+ int get_event_id() {
+ return m_event_id;
+ }
+
+private:
+ int m_thread_id;
+ uint8_t m_port_id;
+ TrexDpPortEvent::event_e m_event_type;
+ int m_event_id;
+
+};
+
+#endif /* __TREX_STATELESS_MESSAGING_H__ */
+