summaryrefslogtreecommitdiffstats
path: root/src/stateless
diff options
context:
space:
mode:
Diffstat (limited to 'src/stateless')
-rw-r--r--src/stateless/cp/trex_stateless.cpp162
-rw-r--r--src/stateless/cp/trex_stateless.h74
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp143
-rw-r--r--src/stateless/cp/trex_stateless_port.h76
-rw-r--r--src/stateless/cp/trex_stream.cpp17
-rw-r--r--src/stateless/cp/trex_stream.h17
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp81
-rw-r--r--src/stateless/cp/trex_streams_compiler.h68
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp210
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h105
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp53
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h79
12 files changed, 665 insertions, 420 deletions
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index 72762e26..e0e95450 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -31,55 +31,58 @@ using namespace std;
* Trex stateless object
*
**********************************************************/
-TrexStateless::TrexStateless() {
- m_is_configured = false;
-}
-
/**
- * configure the singleton stateless object
*
*/
-void TrexStateless::configure(const TrexStatelessCfg &cfg) {
-
- TrexStateless& instance = get_instance_internal();
-
- /* check status */
- if (instance.m_is_configured) {
- throw TrexException("re-configuration of stateless object is not allowed");
- }
+TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) {
/* create RPC servers */
/* set both servers to mutex each other */
- instance.m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &instance.m_global_cp_lock);
- instance.m_rpc_server->set_verbose(cfg.m_rpc_server_verbose);
+ m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &m_global_cp_lock);
+ m_rpc_server->set_verbose(cfg.m_rpc_server_verbose);
/* configure ports */
+ m_port_count = cfg.m_port_count;
- instance.m_port_count = cfg.m_port_count;
-
- for (int i = 0; i < instance.m_port_count; i++) {
- instance.m_ports.push_back(new TrexStatelessPort(i));
+ for (int i = 0; i < m_port_count; i++) {
+ m_ports.push_back(new TrexStatelessPort(i));
}
- /* cores */
- instance.m_dp_core_count = cfg.m_dp_core_count;
- for (int i = 0; i < instance.m_dp_core_count; i++) {
- instance.m_dp_cores.push_back(new TrexStatelessDpCore(i));
+ m_platform_api = cfg.m_platform_api;
+}
+
+/**
+ * release all memory
+ *
+ * @author imarom (08-Oct-15)
+ */
+TrexStateless::~TrexStateless() {
+
+ /* release memory for ports */
+ for (auto port : m_ports) {
+ delete port;
}
+ m_ports.clear();
+
+ /* stops the RPC server */
+ m_rpc_server->stop();
+ delete m_rpc_server;
+
+ m_rpc_server = NULL;
- /* done */
- instance.m_is_configured = true;
+ delete m_platform_api;
+ m_platform_api = NULL;
}
+
/**
* starts the control plane side
*
*/
void
TrexStateless::launch_control_plane() {
- //std::cout << "\n on control/master core \n";
/* pin this process to the current running CPU
any new thread will be called on the same CPU
@@ -94,39 +97,6 @@ TrexStateless::launch_control_plane() {
m_rpc_server->start();
}
-void
-TrexStateless::launch_on_dp_core(uint8_t core_id) {
- m_dp_cores[core_id - 1]->run();
-}
-
-/**
- * destroy the singleton and release all memory
- *
- * @author imarom (08-Oct-15)
- */
-void
-TrexStateless::destroy() {
- TrexStateless& instance = get_instance_internal();
-
- if (!instance.m_is_configured) {
- return;
- }
-
- /* release memory for ports */
- for (auto port : instance.m_ports) {
- delete port;
- }
- instance.m_ports.clear();
-
- /* stops the RPC server */
- instance.m_rpc_server->stop();
- delete instance.m_rpc_server;
-
- instance.m_rpc_server = NULL;
-
- /* done */
- instance.m_is_configured = false;
-}
/**
* fetch a port by ID
@@ -148,57 +118,32 @@ TrexStateless::get_port_count() {
uint8_t
TrexStateless::get_dp_core_count() {
- return m_dp_core_count;
-}
-
-void
-TrexStateless::update_stats() {
-
- /* update CPU util.
- TODO
- */
- m_stats.m_stats.m_cpu_util = 0;
-
- /* for every port update and accumulate */
- for (uint8_t i = 0; i < m_port_count; i++) {
- m_ports[i]->update_stats();
-
- const TrexPortStats & port_stats = m_ports[i]->get_stats();
-
- m_stats.m_stats.m_tx_bps += port_stats.m_stats.m_tx_bps;
- m_stats.m_stats.m_rx_bps += port_stats.m_stats.m_rx_bps;
-
- m_stats.m_stats.m_tx_pps += port_stats.m_stats.m_tx_pps;
- m_stats.m_stats.m_rx_pps += port_stats.m_stats.m_rx_pps;
-
- m_stats.m_stats.m_total_tx_pkts += port_stats.m_stats.m_total_tx_pkts;
- m_stats.m_stats.m_total_rx_pkts += port_stats.m_stats.m_total_rx_pkts;
-
- m_stats.m_stats.m_total_tx_bytes += port_stats.m_stats.m_total_tx_bytes;
- m_stats.m_stats.m_total_rx_bytes += port_stats.m_stats.m_total_rx_bytes;
-
- m_stats.m_stats.m_tx_rx_errors += port_stats.m_stats.m_tx_rx_errors;
- }
+ return m_platform_api->get_dp_core_count();
}
void
TrexStateless::encode_stats(Json::Value &global) {
- global["cpu_util"] = m_stats.m_stats.m_cpu_util;
+ const TrexPlatformApi *api = get_stateless_obj()->get_platform_api();
+
+ TrexPlatformGlobalStats stats;
+ api->get_global_stats(stats);
- global["tx_bps"] = m_stats.m_stats.m_tx_bps;
- global["rx_bps"] = m_stats.m_stats.m_rx_bps;
+ global["cpu_util"] = stats.m_stats.m_cpu_util;
- global["tx_pps"] = m_stats.m_stats.m_tx_pps;
- global["rx_pps"] = m_stats.m_stats.m_rx_pps;
+ global["tx_bps"] = stats.m_stats.m_tx_bps;
+ global["rx_bps"] = stats.m_stats.m_rx_bps;
- global["total_tx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_pkts);
- global["total_rx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_pkts);
+ global["tx_pps"] = stats.m_stats.m_tx_pps;
+ global["rx_pps"] = stats.m_stats.m_rx_pps;
- global["total_tx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_bytes);
- global["total_rx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_bytes);
+ global["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts);
+ global["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts);
- global["tx_rx_errors"] = Json::Value::UInt64(m_stats.m_stats.m_tx_rx_errors);
+ global["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes);
+ global["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes);
+
+ global["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors);
for (uint8_t i = 0; i < m_port_count; i++) {
std::stringstream ss;
@@ -210,3 +155,20 @@ TrexStateless::encode_stats(Json::Value &global) {
}
}
+/**
+ * generate a snapshot for publish (async publish)
+ *
+ */
+void
+TrexStateless::generate_publish_snapshot(std::string &snapshot) {
+ Json::FastWriter writer;
+ Json::Value root;
+
+ root["name"] = "trex-stateless-info";
+ root["type"] = 0;
+
+ /* stateless specific info goes here */
+ root["data"] = Json::nullValue;
+
+ snapshot = writer.write(root);
+}
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index 649b25dd..758707a2 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -29,9 +29,10 @@ limitations under the License.
#include <trex_stream.h>
#include <trex_stateless_port.h>
-#include <trex_stateless_dp_core.h>
#include <trex_rpc_server_api.h>
+#include <internal_api/trex_platform_api.h>
+
/**
* generic exception for errors
* TODO: move this to a better place
@@ -88,17 +89,17 @@ public:
/* default values */
TrexStatelessCfg() {
m_port_count = 0;
- m_dp_core_count = 0;
m_rpc_req_resp_cfg = NULL;
m_rpc_async_cfg = NULL;
- m_rpc_server_verbose = false;
+ m_rpc_server_verbose = false;
+ m_platform_api = NULL;
}
const TrexRpcServerConfig *m_rpc_req_resp_cfg;
const TrexRpcServerConfig *m_rpc_async_cfg;
+ const TrexPlatformApi *m_platform_api;
bool m_rpc_server_verbose;
uint8_t m_port_count;
- uint8_t m_dp_core_count;
};
/**
@@ -113,27 +114,8 @@ public:
* reconfiguration is not allowed
* an exception will be thrown
*/
- static void configure(const TrexStatelessCfg &cfg);
-
- /**
- * destroy the instance
- *
- */
- static void destroy();
-
- /**
- * singleton public get instance
- *
- */
- static TrexStateless& get_instance() {
- TrexStateless& instance = get_instance_internal();
-
- if (!instance.m_is_configured) {
- throw TrexException("object is not configured");
- }
-
- return instance;
- }
+ TrexStateless(const TrexStatelessCfg &cfg);
+ ~TrexStateless();
/**
* starts the control plane side
@@ -152,12 +134,6 @@ public:
uint8_t get_dp_core_count();
- /**
- * update all the stats (deep update)
- * (include all the ports and global stats)
- *
- */
- void update_stats();
/**
* fetch all the stats
@@ -165,22 +141,21 @@ public:
*/
void encode_stats(Json::Value &global);
+ /**
+ * generate a snapshot for publish
+ */
+ void generate_publish_snapshot(std::string &snapshot);
-protected:
- TrexStateless();
-
- static TrexStateless& get_instance_internal () {
- static TrexStateless instance;
- return instance;
+ const TrexPlatformApi * get_platform_api() {
+ return (m_platform_api);
}
- /* c++ 2011 style singleton */
+protected:
+
+ /* no copy or assignment */
TrexStateless(TrexStateless const&) = delete;
void operator=(TrexStateless const&) = delete;
- /* status */
- bool m_is_configured;
-
/* RPC server array */
TrexRpcServer *m_rpc_server;
@@ -188,15 +163,20 @@ protected:
std::vector <TrexStatelessPort *> m_ports;
uint8_t m_port_count;
- /* cores */
- std::vector <TrexStatelessDpCore *> m_dp_cores;
- uint8_t m_dp_core_count;
-
- /* stats */
- TrexStatelessStats m_stats;
+ /* platform API */
+ const TrexPlatformApi *m_platform_api;
std::mutex m_global_cp_lock;
};
+/**
+ * an anchor function
+ *
+ * @author imarom (25-Oct-15)
+ *
+ * @return TrexStateless&
+ */
+TrexStateless * get_stateless_obj();
+
#endif /* __TREX_STATELESS_H__ */
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index a31847a5..375d1f63 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -18,8 +18,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
+
#include <trex_stateless.h>
#include <trex_stateless_port.h>
+#include <trex_stateless_messaging.h>
+#include <trex_streams_compiler.h>
+
#include <string>
#ifndef TREX_RPC_MOCK_SERVER
@@ -59,19 +63,52 @@ TrexStatelessPort::start_traffic(void) {
return (RC_ERR_NO_STREAMS);
}
+ /* fetch all the streams from the table */
+ vector<TrexStream *> streams;
+ get_stream_table()->get_object_list(streams);
+
+ /* compiler it */
+ TrexStreamsCompiler compiler;
+ TrexStreamsCompiledObj *compiled_obj = new TrexStreamsCompiledObj();
+
+ bool rc = compiler.compile(streams, *compiled_obj);
+ if (!rc) {
+ return (RC_ERR_FAILED_TO_COMPILE_STREAMS);
+ }
+
+ /* generate a message to all the relevant DP cores to start transmitting */
+ TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(compiled_obj);
+
+ // FIXME (add the right core list)
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(0);
+
+ ring->Enqueue((CGenNode *)start_msg);
+
+ /* move the state to transmiting */
m_port_state = PORT_STATE_TRANSMITTING;
- /* real code goes here */
return (RC_OK);
}
-void
+TrexStatelessPort::rc_e
TrexStatelessPort::stop_traffic(void) {
/* real code goes here */
- if (m_port_state == PORT_STATE_TRANSMITTING) {
- m_port_state = PORT_STATE_UP_IDLE;
+ if (m_port_state != PORT_STATE_TRANSMITTING) {
+ return (RC_ERR_BAD_STATE_FOR_OP);
}
+
+ /* generate a message to all the relevant DP cores to start transmitting */
+ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop();
+
+ // FIXME (add the right core list)
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(0);
+
+ ring->Enqueue((CGenNode *)stop_msg);
+
+ m_port_state = PORT_STATE_UP_IDLE;
+
+ return (RC_OK);
}
/**
@@ -130,99 +167,27 @@ TrexStatelessPort::generate_handler() {
return (ss.str());
}
-/**
- * update stats for the port
- *
- */
-void
-TrexStatelessPort::update_stats() {
- struct rte_eth_stats stats;
- rte_eth_stats_get(m_port_id, &stats);
-
- /* copy straight values */
- m_stats.m_stats.m_total_tx_bytes = stats.obytes;
- m_stats.m_stats.m_total_rx_bytes = stats.ibytes;
-
- m_stats.m_stats.m_total_tx_pkts = stats.opackets;
- m_stats.m_stats.m_total_rx_pkts = stats.ipackets;
-
- /* calculate stats */
- m_stats.m_stats.m_tx_bps = m_stats.m_bw_tx_bps.add(stats.obytes);
- m_stats.m_stats.m_rx_bps = m_stats.m_bw_rx_bps.add(stats.ibytes);
-
- m_stats.m_stats.m_tx_pps = m_stats.m_bw_tx_pps.add(stats.opackets);
- m_stats.m_stats.m_rx_pps = m_stats.m_bw_rx_pps.add(stats.ipackets);
-
-}
-
-const TrexPortStats &
-TrexStatelessPort::get_stats() {
- return m_stats;
-}
void
TrexStatelessPort::encode_stats(Json::Value &port) {
- port["tx_bps"] = m_stats.m_stats.m_tx_bps;
- port["rx_bps"] = m_stats.m_stats.m_rx_bps;
-
- port["tx_pps"] = m_stats.m_stats.m_tx_pps;
- port["rx_pps"] = m_stats.m_stats.m_rx_pps;
-
- port["total_tx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_pkts);
- port["total_rx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_pkts);
-
- port["total_tx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_bytes);
- port["total_rx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_bytes);
-
- port["tx_rx_errors"] = Json::Value::UInt64(m_stats.m_stats.m_tx_rx_errors);
-}
-
+ const TrexPlatformApi *api = get_stateless_obj()->get_platform_api();
+ TrexPlatformInterfaceStats stats;
+ api->get_interface_stats(m_port_id, stats);
-/***************************
- * BW measurement
- *
- **************************/
-/* TODO: move this to a common place */
-BWMeasure::BWMeasure() {
- reset();
-}
-
-void BWMeasure::reset(void) {
- m_start=false;
- m_last_time_msec=0;
- m_last_bytes=0;
- m_last_result=0.0;
-};
-
-double BWMeasure::calc_MBsec(uint32_t dtime_msec,
- uint64_t dbytes){
- double rate=0.000008*( ( (double)dbytes*(double)os_get_time_freq())/((double)dtime_msec) );
- return(rate);
-}
+ port["tx_bps"] = stats.m_stats.m_tx_bps;
+ port["rx_bps"] = stats.m_stats.m_rx_bps;
-double BWMeasure::add(uint64_t size) {
- if ( false == m_start ) {
- m_start=true;
- m_last_time_msec = os_get_time_msec() ;
- m_last_bytes=size;
- return(0.0);
- }
-
- uint32_t ctime=os_get_time_msec();
- if ((ctime - m_last_time_msec) <os_get_time_freq() ) {
- return(m_last_result);
- }
+ port["tx_pps"] = stats.m_stats.m_tx_pps;
+ port["rx_pps"] = stats.m_stats.m_rx_pps;
- uint32_t dtime_msec = ctime-m_last_time_msec;
- uint64_t dbytes = size - m_last_bytes;
+ port["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts);
+ port["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts);
- m_last_time_msec = ctime;
- m_last_bytes = size;
-
- m_last_result= 0.5*calc_MBsec(dtime_msec,dbytes) +0.5*(m_last_result);
- return( m_last_result );
+ port["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes);
+ port["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes);
+
+ port["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors);
}
-
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 428d5aee..4851a4b5 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -24,71 +24,6 @@ limitations under the License.
#include <trex_stream.h>
/**
- * bandwidth measurement class
- *
- */
-class BWMeasure {
-public:
- BWMeasure();
- void reset(void);
- double add(uint64_t size);
-
-private:
- double calc_MBsec(uint32_t dtime_msec,
- uint64_t dbytes);
-
-public:
- bool m_start;
- uint32_t m_last_time_msec;
- uint64_t m_last_bytes;
- double m_last_result;
-};
-
-/**
- * TRex stateless port stats
- *
- * @author imarom (24-Sep-15)
- */
-class TrexPortStats {
-
-public:
- TrexPortStats() {
- m_stats = {0};
-
- m_bw_tx_bps.reset();
- m_bw_rx_bps.reset();
-
- m_bw_tx_pps.reset();
- m_bw_rx_pps.reset();
- }
-
-public:
-
- BWMeasure m_bw_tx_bps;
- BWMeasure m_bw_rx_bps;
-
- BWMeasure m_bw_tx_pps;
- BWMeasure m_bw_rx_pps;
-
- struct {
-
- double m_tx_bps;
- double m_rx_bps;
-
- double m_tx_pps;
- double m_rx_pps;
-
- uint64_t m_total_tx_pkts;
- uint64_t m_total_rx_pkts;
-
- uint64_t m_total_tx_bytes;
- uint64_t m_total_rx_bytes;
-
- uint64_t m_tx_rx_errors;
- } m_stats;
-};
-
-/**
* describes a stateless port
*
* @author imarom (31-Aug-15)
@@ -127,7 +62,7 @@ public:
* stop traffic
*
*/
- void stop_traffic(void);
+ rc_e stop_traffic(void);
/**
* access the stream table
@@ -203,14 +138,6 @@ public:
}
/**
- * update the values of the stats
- *
- */
- void update_stats();
-
- const TrexPortStats & get_stats();
-
- /**
* encode stats as JSON
*/
void encode_stats(Json::Value &port);
@@ -224,7 +151,6 @@ private:
port_state_e m_port_state;
std::string m_owner;
std::string m_owner_handler;
- TrexPortStats m_stats;
};
#endif /* __TREX_STATELESS_PORT_H__ */
diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp
index 182036f1..ba306137 100644
--- a/src/stateless/cp/trex_stream.cpp
+++ b/src/stateless/cp/trex_stream.cpp
@@ -20,6 +20,7 @@ limitations under the License.
*/
#include <trex_stream.h>
#include <cstddef>
+#include <string.h>
/**************************************
* stream
@@ -103,14 +104,24 @@ TrexStream * TrexStreamTable::get_stream_by_id(uint32_t stream_id) {
}
}
-void TrexStreamTable::get_stream_list(std::vector<uint32_t> &stream_list) {
- stream_list.clear();
+void TrexStreamTable::get_id_list(std::vector<uint32_t> &id_list) {
+ id_list.clear();
for (auto stream : m_stream_table) {
- stream_list.push_back(stream.first);
+ id_list.push_back(stream.first);
}
}
+void TrexStreamTable::get_object_list(std::vector<TrexStream *> &object_list) {
+ object_list.clear();
+
+ for (auto stream : m_stream_table) {
+ object_list.push_back(stream.second);
+ }
+
+}
+
int TrexStreamTable::size() {
return m_stream_table.size();
}
+
diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h
index f5bc96ef..c8a15240 100644
--- a/src/stateless/cp/trex_stream.h
+++ b/src/stateless/cp/trex_stream.h
@@ -37,10 +37,6 @@ class TrexRpcCmdAddStream;
*
*/
class TrexStream {
- /* provide the RPC parser a way to access private fields */
- friend class TrexRpcCmdAddStream;
- friend class TrexRpcCmdGetStream;
- friend class TrexStreamTable;
public:
TrexStream(uint8_t port_id, uint32_t stream_id);
@@ -56,7 +52,7 @@ public:
/* access the stream json */
const Json::Value & get_stream_json();
-protected:
+public:
/* basic */
uint8_t m_port_id;
uint32_t m_stream_id;
@@ -189,7 +185,13 @@ public:
*
* @param stream_list
*/
- void get_stream_list(std::vector<uint32_t> &stream_list);
+ void get_id_list(std::vector<uint32_t> &id_list);
+
+ /**
+ * populate a list with all the stream objects
+ *
+ */
+ void get_object_list(std::vector<TrexStream *> &object_list);
/**
* get the table size
@@ -197,6 +199,9 @@ public:
*/
int size();
+ std::unordered_map<int, TrexStream *>::iterator begin() {return m_stream_table.begin();}
+ std::unordered_map<int, TrexStream *>::iterator end() {return m_stream_table.end();}
+
private:
/**
* holds all the stream in a hash table by stream id
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
new file mode 100644
index 00000000..6c77ad0f
--- /dev/null
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -0,0 +1,81 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include <string.h>
+#include <trex_streams_compiler.h>
+#include <trex_stream.h>
+
+/**************************************
+ * stream compiled object
+ *************************************/
+TrexStreamsCompiledObj::~TrexStreamsCompiledObj() {
+ for (auto &obj : m_objs) {
+ delete obj.m_pkt;
+ }
+ m_objs.clear();
+}
+
+void
+TrexStreamsCompiledObj::add_compiled_stream(double pps, uint8_t *pkt, uint16_t pkt_len) {
+ obj_st obj;
+
+ obj.m_pps = pps;
+ obj.m_pkt_len = pkt_len;
+
+ obj.m_pkt = new uint8_t[pkt_len];
+ memcpy(obj.m_pkt, pkt, pkt_len);
+
+ m_objs.push_back(obj);
+}
+
+/**************************************
+ * stream compiler
+ *************************************/
+bool
+TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj) {
+ /* for now we do something trivial, */
+ for (auto stream : streams) {
+
+ /* skip non-enabled streams */
+ if (!stream->m_enabled) {
+ continue;
+ }
+
+ /* for now skip also non self started streams */
+ if (!stream->m_self_start) {
+ continue;
+ }
+
+ /* for now support only continous ... */
+ TrexStreamContinuous *cont_stream = dynamic_cast<TrexStreamContinuous *>(stream);
+ if (!cont_stream) {
+ continue;
+ }
+
+ /* add it */
+ obj.add_compiled_stream(cont_stream->get_pps(),
+ cont_stream->m_pkt.binary,
+ cont_stream->m_pkt.len);
+ }
+
+ return true;
+}
+
diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h
new file mode 100644
index 00000000..90253cdf
--- /dev/null
+++ b/src/stateless/cp/trex_streams_compiler.h
@@ -0,0 +1,68 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_STREAMS_COMPILER_H__
+#define __TREX_STREAMS_COMPILER_H__
+
+#include <stdint.h>
+#include <vector>
+
+class TrexStreamsCompiler;
+class TrexStream;
+
+/**
+ * compiled object for a table of streams
+ *
+ * @author imarom (28-Oct-15)
+ */
+class TrexStreamsCompiledObj {
+ friend class TrexStreamsCompiler;
+public:
+
+ TrexStreamsCompiledObj() {}
+ ~TrexStreamsCompiledObj();
+
+ struct obj_st {
+ double m_pps;
+ uint8_t *m_pkt;
+ uint16_t m_pkt_len;
+ };
+
+ const std::vector<obj_st> & get_objects() {
+ return m_objs;
+ }
+
+private:
+ void add_compiled_stream(double pps, uint8_t *pkt, uint16_t pkt_len);
+ std::vector<obj_st> m_objs;
+};
+
+class TrexStreamsCompiler {
+public:
+ /**
+ * compiles a vector of streams to an object passable to the DP
+ *
+ * @author imarom (28-Oct-15)
+ *
+ */
+ bool compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj);
+};
+
+#endif /* __TREX_STREAMS_COMPILER_H__ */
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 3755b82c..73387f0e 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -18,118 +18,148 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
-
#include <trex_stateless_dp_core.h>
-#include <stdio.h>
-#include <unistd.h>
-#include <trex_stateless.h>
+#include <trex_stateless_messaging.h>
+#include <trex_streams_compiler.h>
#include <bp_sim.h>
-#ifndef TREX_RPC_MOCK_SERVER
+/**
+ * extended info for the stateless node
+ * TODO:
+ * static_assert(sizeof(dp_node_extended_info_st) <= sizeof(CGenNodeStateless::m_pad_end), "hello");
+ */
+typedef struct dp_node_extended_info_ {
+ double next_time_offset;
+ uint8_t is_stream_active;
-// DPDK c++ issue
-#define UINT8_MAX 255
-#define UINT16_MAX 0xFFFF
-// DPDK c++ issue
-#endif
+} dp_node_extended_info_st;
-#include <rte_ethdev.h>
-#include "mbuf.h"
+TrexStatelessDpCore::TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core) {
+ m_thread_id = thread_id;
+ m_core = core;
-/**
- * TEST
- *
- */
-static const uint8_t udp_pkt[]={
- 0x00,0x00,0x00,0x01,0x00,0x00,
- 0x00,0x00,0x00,0x01,0x00,0x00,
- 0x08,0x00,
-
- 0x45,0x00,0x00,0x81,
- 0xaf,0x7e,0x00,0x00,
- 0x12,0x11,0xd9,0x23,
- 0x01,0x01,0x01,0x01,
- 0x3d,0xad,0x72,0x1b,
-
- 0x11,0x11,
- 0x11,0x11,
-
- 0x00,0x6d,
- 0x00,0x00,
-
- 0x64,0x31,0x3a,0x61,
- 0x64,0x32,0x3a,0x69,0x64,
- 0x32,0x30,0x3a,0xd0,0x0e,
- 0xa1,0x4b,0x7b,0xbd,0xbd,
- 0x16,0xc6,0xdb,0xc4,0xbb,0x43,
- 0xf9,0x4b,0x51,0x68,0x33,0x72,
- 0x20,0x39,0x3a,0x69,0x6e,0x66,0x6f,
- 0x5f,0x68,0x61,0x73,0x68,0x32,0x30,0x3a,0xee,0xc6,0xa3,
- 0xd3,0x13,0xa8,0x43,0x06,0x03,0xd8,0x9e,0x3f,0x67,0x6f,
- 0xe7,0x0a,0xfd,0x18,0x13,0x8d,0x65,0x31,0x3a,0x71,0x39,
- 0x3a,0x67,0x65,0x74,0x5f,0x70,0x65,0x65,0x72,0x73,0x31,
- 0x3a,0x74,0x38,0x3a,0x3d,0xeb,0x0c,0xbf,0x0d,0x6a,0x0d,
- 0xa5,0x31,0x3a,0x79,0x31,0x3a,0x71,0x65,0x87,0xa6,0x7d,
- 0xe7
-};
-
-static int
-test_inject_pkt(uint8_t *pkt, uint32_t pkt_size) {
-
- #ifndef TREX_RPC_MOCK_SERVER
- rte_mempool_t * mp= CGlobalInfo::m_mem_pool[0].m_big_mbuf_pool ;
- #else
- rte_mempool_t * mp = NULL;
- #endif
-
- rte_mbuf_t *m = rte_pktmbuf_alloc(mp);
- if ( unlikely(m==0) ) {
- printf("ERROR no packets \n");
- return (-1);
+ CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
+
+ m_ring_from_cp = cp_dp->getRingCpToDp(thread_id);
+ m_ring_to_cp = cp_dp->getRingDpToCp(thread_id);
+
+ m_state = STATE_IDLE;
+}
+
+void
+TrexStatelessDpCore::start() {
+
+ /* creates a maintenace job using the scheduler */
+ CGenNode * node_sync = m_core->create_node() ;
+ node_sync->m_type = CGenNode::FLOW_SYNC;
+ node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT;
+ m_core->m_node_gen.add_node(node_sync);
+
+ double old_offset = 0.0;
+ m_core->m_node_gen.flush_file(100000000, 0.0, false, m_core, old_offset);
+
+}
+
+void
+TrexStatelessDpCore::handle_pkt_event(CGenNode *node) {
+
+ //TODO: optimize the fast path here...
+
+ CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
+ dp_node_extended_info_st *opaque = (dp_node_extended_info_st *)node_sl->get_opaque_storage();
+
+ /* is this stream active ? */
+ if (!opaque->is_stream_active) {
+ m_core->free_node(node);
+ return;
}
+
+ m_core->m_node_gen.m_v_if->send_node(node);
+
+ /* in case of continues */
+ node->m_time += opaque->next_time_offset;
+
+ /* insert a new event */
+ m_core->m_node_gen.m_p_queue.push(node);
+}
+
+void
+TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len) {
+ CGenNodeStateless *node = m_core->create_node_sl();
+
+ /* add periodic */
+ node->m_type = CGenNode::STATELESS_PKT;
+ node->m_time = m_core->m_cur_time_sec + 0.0 /* STREAM ISG */;
+ node->m_flags = 0;
+
+ /* set socket id */
+ node->set_socket_id(m_core->m_node_gen.m_socket_id);
+
+ /* build a mbuf from a packet */
+ uint16_t pkt_size = pkt_len;
+ const uint8_t *stream_pkt = pkt;
+
+ dp_node_extended_info_st *opaque = (dp_node_extended_info_st *)node->get_opaque_storage();
+ opaque->next_time_offset = 1.0 / pps;
+ opaque->is_stream_active = 1;
+
+ /* allocate const mbuf */
+ rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
+ assert(m);
+
char *p = rte_pktmbuf_append(m, pkt_size);
assert(p);
- /* set pkt data */
- memcpy(p,pkt,pkt_size);
+ /* copy the packet */
+ memcpy(p,stream_pkt,pkt_size);
- rte_mbuf_t *tx_pkts[32];
- tx_pkts[0] = m;
- uint8_t nb_pkts = 1;
- uint16_t ret = rte_eth_tx_burst(0, 0, tx_pkts, nb_pkts);
- (void)ret;
- rte_pktmbuf_free(m);
+ /* set dir 0 or 1 client or server */
+ pkt_dir_t dir = 0;
+ node->set_mbuf_cache_dir(dir);
- return (0);
-}
+ /* TBD repace the mac if req we should add flag */
+ m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
-static int
-test_inject_udp_pkt(){
- return (test_inject_pkt((uint8_t*)udp_pkt,sizeof(udp_pkt)));
+ /* set the packet as a readonly */
+ node->set_cache_mbuf(m);
+
+ m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
+
+ /* keep track */
+ m_active_nodes.push_back(node);
+
+ /* schedule */
+ m_core->m_node_gen.add_node((CGenNode *)node);
}
void
-TrexStatelessDpCore::test_inject_dummy_pkt() {
- test_inject_udp_pkt();
+TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj) {
+ for (auto single_stream : obj->get_objects()) {
+ add_cont_stream(single_stream.m_pps, single_stream.m_pkt, single_stream.m_pkt_len);
+ }
}
-/***************************
- * DP core
- *
- **************************/
-TrexStatelessDpCore::TrexStatelessDpCore(uint8_t core_id) : m_core_id(core_id) {
+void
+TrexStatelessDpCore::stop_traffic() {
+ /* we cannot remove nodes not from the top of the queue so
+ for every active node - make sure next time
+ the scheduler invokes it, it will be free */
+ for (auto node : m_active_nodes) {
+ dp_node_extended_info_st *opaque = (dp_node_extended_info_st *)node->get_opaque_storage();
+ opaque->is_stream_active = 0;
+ }
+ m_active_nodes.clear();
+
+ m_state = STATE_IDLE;
}
/**
- * main function for DP core
+ * handle a message from CP to DP
*
*/
-void
-TrexStatelessDpCore::run() {
- printf("\nOn DP core %d\n", m_core_id);
- while (true) {
- test_inject_dummy_pkt();
- rte_pause();
- }
+void
+TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
+ msg->handle(this);
+ delete msg;
}
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 4b09b752..d95f7eeb 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -21,23 +21,108 @@ limitations under the License.
#ifndef __TREX_STATELESS_DP_CORE_H__
#define __TREX_STATELESS_DP_CORE_H__
-#include <stdint.h>
+#include <vector>
+
+#include <msg_manager.h>
+#include <pal_utl.h>
+
+class TrexStatelessCpToDpMsgBase;
+class TrexStatelessDpStart;
+class CFlowGenListPerThread;
+class CGenNode;
+class TrexStreamsCompiledObj;
+class CGenNodeStateless;
-/**
- * stateless DP core object
- *
- */
class TrexStatelessDpCore {
+
public:
- TrexStatelessDpCore(uint8_t core_id);
+ /* states */
+ enum state_e {
+ STATE_IDLE,
+ STATE_TRANSMITTING
+ };
+
+ TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core);
+
+ /**
+ * launch the stateless DP core code
+ *
+ */
+ void start();
+
+ /**
+ * handle pkt event
+ *
+ * @author imarom (27-Oct-15)
+ */
+ void handle_pkt_event(CGenNode *node);
+
+ /**
+ * dummy traffic creator
+ *
+ * @author imarom (27-Oct-15)
+ *
+ * @param pkt
+ * @param pkt_len
+ */
+ void start_traffic(TrexStreamsCompiledObj *obj);
+
+ /**
+ * stop all traffic for this core
+ *
+ */
+ void stop_traffic();
+
+ /**
+ * check for and handle messages from CP
+ *
+ * @author imarom (27-Oct-15)
+ */
+ void periodic_check_for_cp_messages() {
+ // doing this inline for performance reasons
- /* starts the DP core run */
- void run();
+ /* fast path */
+ if ( likely ( m_ring_from_cp->isEmpty() ) ) {
+ return;
+ }
+
+ while ( true ) {
+ CGenNode * node = NULL;
+ if (m_ring_from_cp->Dequeue(node) != 0) {
+ break;
+ }
+ assert(node);
+
+ TrexStatelessCpToDpMsgBase * msg = (TrexStatelessCpToDpMsgBase *)node;
+ handle_cp_msg(msg);
+ }
+
+ }
private:
- void test_inject_dummy_pkt();
- uint8_t m_core_id;
+ /**
+ * handles a CP to DP message
+ *
+ * @author imarom (27-Oct-15)
+ *
+ * @param msg
+ */
+ void handle_cp_msg(TrexStatelessCpToDpMsgBase *msg);
+
+ void add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len);
+
+ uint8_t m_thread_id;
+ state_e m_state;
+ CNodeRing *m_ring_from_cp;
+ CNodeRing *m_ring_to_cp;
+
+ /* holds the current active nodes */
+ std::vector<CGenNodeStateless *> m_active_nodes;
+
+ /* pointer to the main object */
+ CFlowGenListPerThread *m_core;
};
#endif /* __TREX_STATELESS_DP_CORE_H__ */
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
new file mode 100644
index 00000000..3c6a5933
--- /dev/null
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -0,0 +1,53 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#include <trex_stateless_messaging.h>
+#include <trex_stateless_dp_core.h>
+#include <trex_streams_compiler.h>
+#include <string.h>
+
+/*************************
+ start traffic message
+ ************************/
+TrexStatelessDpStart::TrexStatelessDpStart(TrexStreamsCompiledObj *obj) : m_obj(obj) {
+}
+
+TrexStatelessDpStart::~TrexStatelessDpStart() {
+ if (m_obj) {
+ delete m_obj;
+ }
+}
+
+bool
+TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
+
+ dp_core->start_traffic(m_obj);
+ return true;
+}
+
+/*************************
+ stop traffic message
+ ************************/
+bool
+TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) {
+ dp_core->stop_traffic();
+ return true;
+}
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
new file mode 100644
index 00000000..13f6c05a
--- /dev/null
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -0,0 +1,79 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_STATELESS_MESSAGING_H__
+#define __TREX_STATELESS_MESSAGING_H__
+
+#include <msg_manager.h>
+
+class TrexStatelessDpCore;
+class TrexStreamsCompiledObj;
+
+/**
+ * defines the base class for CP to DP messages
+ *
+ * @author imarom (27-Oct-15)
+ */
+class TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessCpToDpMsgBase() {
+ }
+
+ virtual ~TrexStatelessCpToDpMsgBase() {
+ }
+
+ /**
+ * virtual function to handle a message
+ *
+ */
+ virtual bool handle(TrexStatelessDpCore *dp_core) = 0;
+};
+
+/**
+ * a message to start traffic
+ *
+ * @author imarom (27-Oct-15)
+ */
+class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpStart(TrexStreamsCompiledObj *obj);
+
+ ~TrexStatelessDpStart();
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+private:
+ TrexStreamsCompiledObj *m_obj;
+};
+
+/**
+ * a message to stop traffic
+ *
+ * @author imarom (27-Oct-15)
+ */
+class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase {
+public:
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+};
+
+
+#endif /* __TREX_STATELESS_MESSAGING_H__ */