summaryrefslogtreecommitdiffstats
path: root/src/stateless
diff options
context:
space:
mode:
Diffstat (limited to 'src/stateless')
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp401
-rw-r--r--src/stateless/cp/trex_stateless_port.h258
-rw-r--r--src/stateless/cp/trex_stream.cpp38
-rw-r--r--src/stateless/cp/trex_stream.h85
-rw-r--r--src/stateless/cp/trex_stream_vm.cpp804
-rw-r--r--src/stateless/cp/trex_stream_vm.h854
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp210
-rw-r--r--src/stateless/cp/trex_streams_compiler.h74
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp140
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h2
-rw-r--r--src/stateless/dp/trex_stream_node.h69
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp4
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h6
13 files changed, 2676 insertions, 269 deletions
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 8e18a5bf..3a64f8c5 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -57,7 +57,6 @@ TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api
m_port_id = port_id;
m_port_state = PORT_STATE_IDLE;
- clear_owner();
/* get the platform specific data */
api->get_interface_info(port_id, m_driver_name, m_speed);
@@ -85,18 +84,40 @@ TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api
* @param force
*/
void
-TrexStatelessPort::acquire(const std::string &user, bool force) {
- if ( (!is_free_to_aquire()) && (get_owner() != user) && (!force)) {
- throw TrexRpcException("port is already taken by '" + get_owner() + "'");
+TrexStatelessPort::acquire(const std::string &user, uint32_t session_id, bool force) {
+
+ /* if port is free - just take it */
+ if (get_owner().is_free()) {
+ get_owner().own(user);
+ return;
+ }
+
+ if (force) {
+ get_owner().own(user);
+
+ /* inform the other client of the steal... */
+ Json::Value data;
+
+ data["port_id"] = m_port_id;
+ data["who"] = user;
+ data["session_id"] = session_id;
+
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FORCE_ACQUIRED, data);
+
+ } else {
+ /* not same user or session id and not force - report error */
+ if (get_owner().get_name() == user) {
+ throw TrexRpcException("port is already owned by another session of '" + user + "'");
+ } else {
+ throw TrexRpcException("port is already taken by '" + get_owner().get_name() + "'");
+ }
}
- set_owner(user);
}
void
TrexStatelessPort::release(void) {
- verify_state( ~(PORT_STATE_TX | PORT_STATE_PAUSE) );
- clear_owner();
+ get_owner().release();
}
/**
@@ -104,71 +125,71 @@ TrexStatelessPort::release(void) {
*
*/
void
-TrexStatelessPort::start_traffic(double mul, double duration) {
+TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration) {
/* command allowed only on state stream */
verify_state(PORT_STATE_STREAMS);
+ /* just making sure no leftovers... */
+ delete_streams_graph();
+
+ /* on start - we can only provide absolute values */
+ assert(mul.m_op == TrexPortMultiplier::OP_ABS);
+
+ double factor = calculate_effective_factor(mul);
+
/* fetch all the streams from the table */
vector<TrexStream *> streams;
get_object_list(streams);
- /* split it per core */
- double per_core_mul = mul / m_cores_id_list.size();
/* compiler it */
- TrexStreamsCompiler compiler;
- TrexStreamsCompiledObj *compiled_obj = new TrexStreamsCompiledObj(m_port_id, per_core_mul);
+ std::vector<TrexStreamsCompiledObj *> compiled_objs;
+ std::string fail_msg;
- bool rc = compiler.compile(streams, *compiled_obj);
+ TrexStreamsCompiler compiler;
+ bool rc = compiler.compile(m_port_id,
+ streams,
+ compiled_objs,
+ get_dp_core_count(),
+ factor,
+ &fail_msg);
if (!rc) {
- throw TrexRpcException("Failed to compile streams");
+ throw TrexRpcException(fail_msg);
}
/* generate a message to all the relevant DP cores to start transmitting */
int event_id = m_dp_events.generate_event_id();
+
/* mark that DP event of stoppped is possible */
m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id);
- TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_obj, duration);
-
- m_last_all_streams_continues = compiled_obj->get_all_streams_continues();
- m_last_duration =duration;
+ /* update object status */
+ m_factor = factor;
+ m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues();
+ m_last_duration = duration;
change_state(PORT_STATE_TX);
- send_message_to_dp(start_msg);
+
+ /* update the DP - messages will be freed by the DP */
+ int index = 0;
+ for (auto core_id : m_cores_id_list) {
+
+ TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration);
+ send_message_to_dp(core_id, start_msg);
+
+ index++;
+ }
+ /* update subscribers */
Json::Value data;
data["port_id"] = m_port_id;
get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
+
}
-double
-TrexStatelessPort::calculate_m_from_bps(double max_bps) {
- /* fetch all the streams from the table */
- vector<TrexStream *> streams;
- get_object_list(streams);
-
- TrexStreamsGraph graph;
- const TrexStreamsGraphObj &obj = graph.generate(streams);
-
- return (max_bps / obj.get_max_bps());
-}
-
-double
-TrexStatelessPort::calculate_m_from_pps(double max_pps) {
- /* fetch all the streams from the table */
- vector<TrexStream *> streams;
- get_object_list(streams);
-
- TrexStreamsGraph graph;
- const TrexStreamsGraphObj &obj = graph.generate(streams);
-
- return (max_pps / obj.get_max_pps());
-}
-
/**
* stop traffic on port
*
@@ -180,17 +201,20 @@ void
TrexStatelessPort::stop_traffic(void) {
if (!( (m_port_state == PORT_STATE_TX)
- || (m_port_state ==PORT_STATE_PAUSE) )) {
+ || (m_port_state == PORT_STATE_PAUSE) )) {
return;
}
+ /* delete any previous graphs */
+ delete_streams_graph();
+
/* mask out the DP stop event */
m_dp_events.disable(TrexDpPortEvent::EVENT_STOP);
/* generate a message to all the relevant DP cores to start transmitting */
TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
- send_message_to_dp(stop_msg);
+ send_message_to_all_dp(stop_msg);
change_state(PORT_STATE_STREAMS);
@@ -213,11 +237,15 @@ TrexStatelessPort::pause_traffic(void) {
throw TrexRpcException(" pause is supported when duration is not enable is start command ");
}
- TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpPause(m_port_id);
+ TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id);
- send_message_to_dp(stop_msg);
+ send_message_to_all_dp(pause_msg);
change_state(PORT_STATE_PAUSE);
+
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data);
}
void
@@ -226,23 +254,54 @@ TrexStatelessPort::resume_traffic(void) {
verify_state(PORT_STATE_PAUSE);
/* generate a message to all the relevant DP cores to start transmitting */
- TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpResume(m_port_id);
+ TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id);
- send_message_to_dp(stop_msg);
+ send_message_to_all_dp(resume_msg);
change_state(PORT_STATE_TX);
+
+
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_RESUMED, data);
}
void
-TrexStatelessPort::update_traffic(double mul) {
+TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul) {
+
+ double factor;
verify_state(PORT_STATE_TX | PORT_STATE_PAUSE);
/* generate a message to all the relevant DP cores to start transmitting */
- double per_core_mul = mul / m_cores_id_list.size();
- TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, per_core_mul);
+ double new_factor = calculate_effective_factor(mul);
+
+ switch (mul.m_op) {
+ case TrexPortMultiplier::OP_ABS:
+ factor = new_factor / m_factor;
+ break;
+
+ case TrexPortMultiplier::OP_ADD:
+ factor = (m_factor + new_factor) / m_factor;
+ break;
+
+ case TrexPortMultiplier::OP_SUB:
+ factor = (m_factor - new_factor) / m_factor;
+ if (factor <= 0) {
+ throw TrexRpcException("Update request will lower traffic to less than zero");
+ }
+ break;
+
+ default:
+ assert(0);
+ break;
+ }
+
+ TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
- send_message_to_dp(update_msg);
+ send_message_to_all_dp(update_msg);
+
+ m_factor *= factor;
}
@@ -295,27 +354,6 @@ TrexStatelessPort::change_state(port_state_e new_state) {
m_port_state = new_state;
}
-/**
- * generate a random connection handler
- *
- */
-std::string
-TrexStatelessPort::generate_handler() {
- std::stringstream ss;
-
- static const char alphanum[] =
- "0123456789"
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "abcdefghijklmnopqrstuvwxyz";
-
- /* generate 8 bytes of random handler */
- for (int i = 0; i < 8; ++i) {
- ss << alphanum[rand() % (sizeof(alphanum) - 1)];
- }
-
- return (ss.str());
-}
-
void
TrexStatelessPort::encode_stats(Json::Value &port) {
@@ -341,15 +379,22 @@ TrexStatelessPort::encode_stats(Json::Value &port) {
}
void
-TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) {
+TrexStatelessPort::send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg) {
for (auto core_id : m_cores_id_list) {
-
- /* send the message to the core */
- CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
- ring->Enqueue((CGenNode *)msg->clone());
+ send_message_to_dp(core_id, msg->clone());
}
+ /* original was not sent - delete it */
+ delete msg;
+}
+
+void
+TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg) {
+
+ /* send the message to the core */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
+ ring->Enqueue((CGenNode *)msg);
}
/**
@@ -376,3 +421,201 @@ TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
}
}
+
+uint64_t
+TrexStatelessPort::get_port_speed_bps() const {
+ switch (m_speed) {
+ case TrexPlatformApi::SPEED_1G:
+ return (1LLU * 1000 * 1000 * 1000);
+
+ case TrexPlatformApi::SPEED_10G:
+ return (10LLU * 1000 * 1000 * 1000);
+
+ case TrexPlatformApi::SPEED_40G:
+ return (40LLU * 1000 * 1000 * 1000);
+
+ default:
+ return 0;
+ }
+}
+
+double
+TrexStatelessPort::calculate_effective_factor(const TrexPortMultiplier &mul) {
+
+ /* for a simple factor request */
+ if (mul.m_type == TrexPortMultiplier::MUL_FACTOR) {
+ return (mul.m_value);
+ }
+
+ /* we now need the graph - generate it if we don't have it (happens once) */
+ if (!m_graph_obj) {
+ generate_streams_graph();
+ }
+
+ switch (mul.m_type) {
+ case TrexPortMultiplier::MUL_BPS:
+ return (mul.m_value / m_graph_obj->get_max_bps());
+
+ case TrexPortMultiplier::MUL_PPS:
+ return (mul.m_value / m_graph_obj->get_max_pps());
+
+ case TrexPortMultiplier::MUL_PERCENTAGE:
+ /* if abs percentage is from the line speed - otherwise its from the current speed */
+
+ if (mul.m_op == TrexPortMultiplier::OP_ABS) {
+ double required = (mul.m_value / 100.0) * get_port_speed_bps();
+ return (required / m_graph_obj->get_max_bps());
+ } else {
+ return (m_factor * (mul.m_value / 100.0));
+ }
+
+ default:
+ assert(0);
+ }
+
+}
+
+
+void
+TrexStatelessPort::generate_streams_graph() {
+
+ /* dispose of the old one */
+ if (m_graph_obj) {
+ delete_streams_graph();
+ }
+
+ /* fetch all the streams from the table */
+ vector<TrexStream *> streams;
+ get_object_list(streams);
+
+ TrexStreamsGraph graph;
+ m_graph_obj = graph.generate(streams);
+}
+
+void
+TrexStatelessPort::delete_streams_graph() {
+ if (m_graph_obj) {
+ delete m_graph_obj;
+ m_graph_obj = NULL;
+ }
+}
+
+
+
+/***************************
+ * port multiplier
+ *
+ **************************/
+const std::initializer_list<std::string> TrexPortMultiplier::g_types = {"raw", "bps", "pps", "percentage"};
+const std::initializer_list<std::string> TrexPortMultiplier::g_ops = {"abs", "add", "sub"};
+
+TrexPortMultiplier::
+TrexPortMultiplier(const std::string &type_str, const std::string &op_str, double value) {
+ mul_type_e type;
+ mul_op_e op;
+
+ if (type_str == "raw") {
+ type = MUL_FACTOR;
+
+ } else if (type_str == "bps") {
+ type = MUL_BPS;
+
+ } else if (type_str == "pps") {
+ type = MUL_PPS;
+
+ } else if (type_str == "percentage") {
+ type = MUL_PERCENTAGE;
+ } else {
+ throw TrexException("bad type str: " + type_str);
+ }
+
+ if (op_str == "abs") {
+ op = OP_ABS;
+
+ } else if (op_str == "add") {
+ op = OP_ADD;
+
+ } else if (op_str == "sub") {
+ op = OP_SUB;
+
+ } else {
+ throw TrexException("bad op str: " + op_str);
+ }
+
+ m_type = type;
+ m_op = op;
+ m_value = value;
+
+}
+
+const TrexStreamsGraphObj *
+TrexStatelessPort::validate(void) {
+
+ /* first compile the graph */
+
+ vector<TrexStream *> streams;
+ get_object_list(streams);
+
+ if (streams.size() == 0) {
+ throw TrexException("no streams attached to port");
+ }
+
+ TrexStreamsCompiler compiler;
+ std::vector<TrexStreamsCompiledObj *> compiled_objs;
+
+ std::string fail_msg;
+ bool rc = compiler.compile(m_port_id,
+ streams,
+ compiled_objs,
+ get_dp_core_count(),
+ 1.0,
+ &fail_msg);
+ if (!rc) {
+ throw TrexException(fail_msg);
+ }
+
+ for (auto obj : compiled_objs) {
+ delete obj;
+ }
+
+ /* now create a stream graph */
+ if (!m_graph_obj) {
+ generate_streams_graph();
+ }
+
+ return m_graph_obj;
+}
+
+
+/************* Trex Port Owner **************/
+
+TrexPortOwner::TrexPortOwner() {
+ m_is_free = true;
+
+ /* for handlers random generation */
+ srand(time(NULL));
+}
+
+/**
+ * generate a random connection handler
+ *
+ */
+std::string
+TrexPortOwner::generate_handler() {
+ std::stringstream ss;
+
+ static const char alphanum[] =
+ "0123456789"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "abcdefghijklmnopqrstuvwxyz";
+
+ /* generate 8 bytes of random handler */
+ for (int i = 0; i < 8; ++i) {
+ ss << alphanum[rand() % (sizeof(alphanum) - 1)];
+ }
+
+ return (ss.str());
+}
+
+const std::string TrexPortOwner::g_unowned_name = "<FREE>";
+const std::string TrexPortOwner::g_unowned_handler = "";
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index b061a414..a529d38f 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -26,6 +26,79 @@ limitations under the License.
#include <internal_api/trex_platform_api.h>
class TrexStatelessCpToDpMsgBase;
+class TrexStreamsGraphObj;
+class TrexPortMultiplier;
+
+/**
+ * TRex port owner can perform
+ * write commands
+ * while port is owned - others can
+ * do read only commands
+ *
+ */
+class TrexPortOwner {
+public:
+
+ TrexPortOwner();
+
+ /**
+ * is port free to acquire
+ */
+ bool is_free() {
+ return m_is_free;
+ }
+
+ void release() {
+ m_is_free = true;
+ m_owner_name = "";
+ m_handler = "";
+ }
+
+ bool is_owned_by(const std::string &user) {
+ return ( !m_is_free && (m_owner_name == user) );
+ }
+
+ void own(const std::string &owner_name) {
+
+ /* save user data */
+ m_owner_name = owner_name;
+
+ /* internal data */
+ m_handler = generate_handler();
+ m_is_free = false;
+ }
+
+ bool verify(const std::string &handler) {
+ return ( (!m_is_free) && (m_handler == handler) );
+ }
+
+ const std::string &get_name() {
+ return (!m_is_free ? m_owner_name : g_unowned_name);
+ }
+
+ const std::string &get_handler() {
+ return (!m_is_free ? m_handler : g_unowned_handler);
+ }
+
+
+private:
+ std::string generate_handler();
+
+ /* is this port owned by someone ? */
+ bool m_is_free;
+
+ /* user provided info */
+ std::string m_owner_name;
+
+ /* handler genereated internally */
+ std::string m_handler;
+
+
+ /* just references defaults... */
+ static const std::string g_unowned_name;
+ static const std::string g_unowned_handler;
+};
+
/**
* describes a stateless port
@@ -58,13 +131,14 @@ public:
RC_ERR_FAILED_TO_COMPILE_STREAMS
};
+
TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api);
/**
* acquire port
* throws TrexException in case of an error
*/
- void acquire(const std::string &user, bool force = false);
+ void acquire(const std::string &user, uint32_t session_id, bool force = false);
/**
* release the port from the current user
@@ -73,22 +147,20 @@ public:
void release(void);
/**
- * start traffic
- * throws TrexException in case of an error
+ * validate the state of the port before start
+ * it will return a stream graph
+ * containing information about the streams
+ * configured on this port
+ *
+ * on error it throws TrexException
*/
- void start_traffic(double mul, double duration = -1);
+ const TrexStreamsGraphObj *validate(void);
/**
- * given a BPS rate calculate ther correct M for this port
- *
- */
- double calculate_m_from_bps(double max_bps);
-
- /**
- * given a PPS rate calculate the correct M for this port
- *
+ * start traffic
+ * throws TrexException in case of an error
*/
- double calculate_m_from_pps(double max_pps);
+ void start_traffic(const TrexPortMultiplier &mul, double duration);
/**
* stop traffic
@@ -112,7 +184,7 @@ public:
* update current traffic on port
*
*/
- void update_traffic(double mul);
+ void update_traffic(const TrexPortMultiplier &mul);
/**
* get the port state
@@ -139,29 +211,6 @@ public:
void get_properties(std::string &driver, TrexPlatformApi::driver_speed_e &speed);
- /**
- * query for ownership
- *
- */
- const std::string &get_owner() {
- return m_owner;
- }
-
- /**
- * owner handler
- * for the connection
- *
- */
- const std::string &get_owner_handler() {
- return m_owner_handler;
- }
-
-
- bool verify_owner_handler(const std::string &handler) {
-
- return ( (m_owner != "none") && (m_owner_handler == handler) );
-
- }
/**
* encode stats as JSON
@@ -181,6 +230,7 @@ public:
verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
m_stream_table.add_stream(stream);
+ delete_streams_graph();
change_state(PORT_STATE_STREAMS);
}
@@ -189,6 +239,7 @@ public:
verify_state(PORT_STATE_STREAMS);
m_stream_table.remove_stream(stream);
+ delete_streams_graph();
if (m_stream_table.size() == 0) {
change_state(PORT_STATE_IDLE);
@@ -199,6 +250,7 @@ public:
verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
m_stream_table.remove_and_delete_all_streams();
+ delete_streams_graph();
change_state(PORT_STATE_IDLE);
}
@@ -220,31 +272,34 @@ public:
}
-
-private:
-
-
-
/**
- * take ownership of the server array
- * this is static
- * ownership is total
- *
- */
- void set_owner(const std::string &owner) {
- m_owner = owner;
- m_owner_handler = generate_handler();
+ * returns the number of DP cores linked to this port
+ *
+ */
+ uint8_t get_dp_core_count() {
+ return m_cores_id_list.size();
}
- void clear_owner() {
- m_owner = "none";
- m_owner_handler = "";
+ /**
+ * returns the traffic multiplier currently being used by the DP
+ *
+ */
+ double get_multiplier() {
+ return (m_factor);
}
- bool is_free_to_aquire() {
- return (m_owner == "none");
+ /**
+ * get port speed in bits per second
+ *
+ */
+ uint64_t get_port_speed_bps() const;
+
+ TrexPortOwner & get_owner() {
+ return m_owner;
}
+private:
+
const std::vector<int> get_core_id_list () {
return m_cores_id_list;
@@ -256,7 +311,17 @@ private:
std::string generate_handler();
- void send_message_to_dp(TrexStatelessCpToDpMsgBase *msg);
+ /**
+ * send message to all cores using duplicate
+ *
+ */
+ void send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg);
+
+ /**
+ * send message to specific DP core
+ *
+ */
+ void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg);
/**
* triggered when event occurs
@@ -265,22 +330,97 @@ private:
void on_dp_event_occured(TrexDpPortEvent::event_e event_type);
+ /**
+ * calculate effective M per core
+ *
+ */
+ double calculate_effective_factor(const TrexPortMultiplier &mul);
+
+
+
+ /**
+ * generates a graph of streams graph
+ *
+ */
+ void generate_streams_graph();
+
+ /**
+ * dispose of it
+ *
+ * @author imarom (26-Nov-15)
+ */
+ void delete_streams_graph();
+
+
TrexStreamTable m_stream_table;
uint8_t m_port_id;
port_state_e m_port_state;
- std::string m_owner;
- std::string m_owner_handler;
std::string m_driver_name;
TrexPlatformApi::driver_speed_e m_speed;
/* holds the DP cores associated with this port */
- std::vector<int> m_cores_id_list;
+ std::vector<int> m_cores_id_list;
bool m_last_all_streams_continues;
double m_last_duration;
+ double m_factor;
TrexDpPortEvents m_dp_events;
+
+ /* holds a graph of streams rate*/
+ const TrexStreamsGraphObj *m_graph_obj;
+
+ /* owner information */
+ TrexPortOwner m_owner;
+};
+
+
+/**
+ * port multiplier object
+ *
+ */
+class TrexPortMultiplier {
+public:
+
+
+ /**
+ * defines the type of multipler passed to start
+ */
+ enum mul_type_e {
+ MUL_FACTOR,
+ MUL_BPS,
+ MUL_PPS,
+ MUL_PERCENTAGE
+ };
+
+ /**
+ * multiplier can be absolute value
+ * increment value or subtract value
+ */
+ enum mul_op_e {
+ OP_ABS,
+ OP_ADD,
+ OP_SUB
+ };
+
+
+ TrexPortMultiplier(mul_type_e type, mul_op_e op, double value) {
+ m_type = type;
+ m_op = op;
+ m_value = value;
+ }
+
+ TrexPortMultiplier(const std::string &type_str, const std::string &op_str, double value);
+
+
+public:
+ static const std::initializer_list<std::string> g_types;
+ static const std::initializer_list<std::string> g_ops;
+
+ mul_type_e m_type;
+ mul_op_e m_op;
+ double m_value;
};
#endif /* __TREX_STATELESS_PORT_H__ */
diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp
index cad603e2..02f43a3a 100644
--- a/src/stateless/cp/trex_stream.cpp
+++ b/src/stateless/cp/trex_stream.cpp
@@ -53,6 +53,36 @@ std::string TrexStream::get_stream_type_str(stream_type_t stream_type){
}
+void
+TrexStream::compile() {
+
+ /* in case there are no instructions - nothing to do */
+ if (m_vm.is_vm_empty()) {
+ m_has_vm = false;
+ return;
+ }
+
+ m_has_vm = true;
+
+ m_vm.set_packet_size(m_pkt.len);
+
+ m_vm.compile();
+
+ #if 0
+ m_vm.Dump(stdout);
+ #endif
+
+ m_vm_dp = m_vm.cloneAsVmDp();
+
+ /* calc m_vm_prefix_size which is the size of the writable packet */
+ uint16_t max_pkt_offset = m_vm_dp->get_max_packet_update_offset();
+ uint16_t pkt_size = m_pkt.len;
+
+ /* calculate the mbuf size that we should allocate */
+ m_vm_prefix_size = calc_writable_mbuf_size(max_pkt_offset, pkt_size);
+}
+
+
void TrexStream::Dump(FILE *fd){
fprintf(fd,"\n");
@@ -103,6 +133,8 @@ TrexStream::TrexStream(uint8_t type,
m_pkt.binary = NULL;
m_pkt.len = 0;
+ m_has_vm = false;
+ m_vm_prefix_size = 0;
m_rx_check.m_enable = false;
@@ -111,12 +143,17 @@ TrexStream::TrexStream(uint8_t type,
m_burst_total_pkts=0;
m_num_bursts=1;
m_ibg_usec=0.0;
+ m_vm_dp = NULL;
}
TrexStream::~TrexStream() {
if (m_pkt.binary) {
delete [] m_pkt.binary;
}
+ if ( m_vm_dp ){
+ delete m_vm_dp;
+ m_vm_dp=NULL;
+ }
}
void
@@ -199,3 +236,4 @@ int TrexStreamTable::size() {
return m_stream_table.size();
}
+
diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h
index 3e48d7e4..720246f6 100644
--- a/src/stateless/cp/trex_stream.h
+++ b/src/stateless/cp/trex_stream.h
@@ -36,6 +36,54 @@ limitations under the License.
class TrexRpcCmdAddStream;
+static inline uint16_t get_log2_size(uint16_t size){
+
+ uint16_t _sizes[]={64,128,256,512,1024,2048};
+ int i;
+ for (i=0; i<sizeof(_sizes)/sizeof(_sizes[0]); i++) {
+ if (size<=_sizes[i]) {
+ return (_sizes[i]);
+ }
+ }
+ assert(0);
+ return (0);
+}
+
+/**
+ * calculate the size of writable mbuf in bytes. maximum size if packet size
+ *
+ * @param max_offset_writable
+ * the last byte that we don't write too. for example when 63 it means that bytes [62] in the array is written (zero base)
+ * @param pkt_size packet size in bytes
+ *
+ * @return the writable size of the first mbuf . the idea is to give at least 64 bytes const mbuf else all packet will be writeable
+ *
+ * examples:
+ * max_offset_writable =63
+ * pkt_size =62
+ * ==>62
+ *
+ */
+static inline uint16_t calc_writable_mbuf_size(uint16_t max_offset_writable,
+ uint16_t pkt_size){
+
+ if ( pkt_size<=64 ){
+ return (pkt_size);
+ }
+ if (pkt_size<=128) {
+ return (pkt_size);
+ }
+
+ //pkt_size> 128
+ uint16_t non_writable = pkt_size - (max_offset_writable -1) ;
+ if ( non_writable<64 ) {
+ return (pkt_size);
+ }
+ return(max_offset_writable-1);
+}
+
+
+
struct CStreamPktData {
uint8_t *binary;
uint16_t len;
@@ -129,9 +177,18 @@ public:
}
/* create new stream */
- TrexStream * clone_as_dp(){
- TrexStream * dp=new TrexStream(m_type,m_port_id,m_stream_id);
-
+ TrexStream * clone_as_dp() const {
+
+ TrexStream *dp = new TrexStream(m_type,m_port_id,m_stream_id);
+ dp->m_has_vm = m_has_vm;
+ if (m_vm_dp) {
+ /* should have vm */
+ assert(m_has_vm);
+ dp->m_vm_dp = m_vm_dp->clone();
+ }else{
+ dp->m_vm_dp = NULL;
+ }
+ dp->m_vm_prefix_size = m_vm_prefix_size;
dp->m_isg_usec = m_isg_usec;
dp->m_next_stream_id = m_next_stream_id;
@@ -161,10 +218,28 @@ public:
}
void Dump(FILE *fd);
+
+ bool is_vm(){
+ return ( m_has_vm );
+ }
+
+ StreamVmDp * getDpVm(){
+ return ( m_vm_dp);
+ }
+
+ void post_vm_compile();
+
+ /**
+ * internal compilation of stream (for DP)
+ *
+ */
+ void compile();
+
public:
/* basic */
uint8_t m_type;
uint8_t m_port_id;
+ uint16_t m_vm_prefix_size; /* writeable mbuf size */
uint32_t m_stream_id; /* id from RPC can be anything */
@@ -175,6 +250,10 @@ public:
/* indicators */
bool m_enabled;
bool m_self_start;
+ bool m_has_vm; /* do we have instructions to run */
+
+
+ StreamVmDp * m_vm_dp; /* compile VM */
CStreamPktData m_pkt;
/* pkt */
diff --git a/src/stateless/cp/trex_stream_vm.cpp b/src/stateless/cp/trex_stream_vm.cpp
index 2e760ae9..e10e1a81 100644
--- a/src/stateless/cp/trex_stream_vm.cpp
+++ b/src/stateless/cp/trex_stream_vm.cpp
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -19,6 +20,108 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include <trex_stream_vm.h>
+#include <sstream>
+#include <assert.h>
+#include <iostream>
+#include <trex_stateless.h>
+#include <common/Network/Packet/IPHeader.h>
+#include <common/basic_utils.h>
+
+
+
+
+void StreamVmInstructionFixChecksumIpv4::Dump(FILE *fd){
+ fprintf(fd," fix_check_sum , %lu \n",(ulong)m_pkt_offset);
+}
+
+
+void StreamVmInstructionFlowMan::sanity_check_valid_size(uint32_t ins_id,StreamVm *lp){
+ uint8_t valid[]={1,2,4,8};
+ int i;
+ for (i=0; i<sizeof(valid)/sizeof(valid[0]); i++) {
+ if (valid[i]==m_size_bytes) {
+ return;
+ }
+ }
+
+ std::stringstream ss;
+
+ ss << "instruction id '" << ins_id << "' has non valid length " << m_size_bytes ;
+
+ lp->err(ss.str());
+}
+
+void StreamVmInstructionFlowMan::sanity_check_valid_opt(uint32_t ins_id,StreamVm *lp){
+ uint8_t valid[]={FLOW_VAR_OP_INC,
+ FLOW_VAR_OP_DEC,
+ FLOW_VAR_OP_RANDOM};
+ int i;
+ for (i=0; i<sizeof(valid)/sizeof(valid[0]); i++) {
+ if (valid[i]==m_op) {
+ return;
+ }
+ }
+
+ std::stringstream ss;
+
+ ss << "instruction id '" << ins_id << "' has non valid op " << (int)m_op ;
+
+ lp->err(ss.str());
+}
+
+void StreamVmInstructionFlowMan::sanity_check_valid_range(uint32_t ins_id,StreamVm *lp){
+ //TBD check that init,min,max in valid range
+}
+
+
+
+void StreamVmInstructionFlowMan::sanity_check(uint32_t ins_id,StreamVm *lp){
+
+ sanity_check_valid_size(ins_id,lp);
+ sanity_check_valid_opt(ins_id,lp);
+ sanity_check_valid_range(ins_id,lp);
+}
+
+
+void StreamVmInstructionFlowMan::Dump(FILE *fd){
+ fprintf(fd," flow_var , %s ,%lu, ",m_var_name.c_str(),(ulong)m_size_bytes);
+
+ switch (m_op) {
+
+ case FLOW_VAR_OP_INC :
+ fprintf(fd," INC ,");
+ break;
+ case FLOW_VAR_OP_DEC :
+ fprintf(fd," DEC ,");
+ break;
+ case FLOW_VAR_OP_RANDOM :
+ fprintf(fd," RANDOM ,");
+ break;
+ default:
+ fprintf(fd," UNKNOWN,");
+ break;
+ };
+
+ fprintf(fd," (%lu:%lu:%lu) \n",m_init_value,m_min_value,m_max_value);
+}
+
+
+void StreamVmInstructionWriteToPkt::Dump(FILE *fd){
+
+ fprintf(fd," write_pkt , %s ,%lu, add, %ld, big, %lu \n",m_flow_var_name.c_str(),(ulong)m_pkt_offset,(long)m_add_value,(ulong)(m_is_big_endian?1:0));
+}
+
+
+
+void StreamVmInstructionFlowClient::Dump(FILE *fd){
+
+ fprintf(fd," client_var ,%s , ",m_var_name.c_str());
+
+ fprintf(fd," ip:(%x-%x) port:(%x-%x) flow_limit:%lu flags: %x\n",m_client_min,m_client_max, m_port_min,m_port_max,(ulong)m_limit_num_flows,m_flags);
+}
+
+
+
/***************************
* StreamVmInstruction
@@ -36,19 +139,714 @@ void StreamVm::add_instruction(StreamVmInstruction *inst) {
m_inst_list.push_back(inst);
}
+StreamDPVmInstructions *
+StreamVm::get_dp_instruction_buffer(){
+ return &m_instructions;
+}
+
+
const std::vector<StreamVmInstruction *> &
StreamVm::get_instruction_list() {
return m_inst_list;
}
-bool StreamVm::compile() {
- /* implement me */
- return (false);
+void StreamVm::var_clear_table(){
+ m_flow_var_offset.clear();
+}
+
+bool StreamVm::var_add(const std::string &var_name,VmFlowVarRec & var){
+ m_flow_var_offset[var_name] = var;
+ return (true);
+}
+
+
+uint16_t StreamVm::get_var_offset(const std::string &var_name){
+ VmFlowVarRec var;
+ bool res=var_lookup(var_name,var);
+ assert(res);
+ return (var.m_offset);
+}
+
+
+bool StreamVm::var_lookup(const std::string &var_name,VmFlowVarRec & var){
+ auto search = m_flow_var_offset.find(var_name);
+
+ if (search != m_flow_var_offset.end()) {
+ var = search->second;
+ return true;
+ } else {
+ return false;
+ }
+}
+
+
+
+void StreamVm::err(const std::string &err){
+ throw TrexException("*** error: " + err);
+}
+
+
+void StreamVm::build_flow_var_table() {
+
+ var_clear_table();
+ m_cur_var_offset=0;
+ uint32_t ins_id=0;
+ /* scan all flow var instruction and build */
+ for (auto inst : m_inst_list) {
+ if ( inst->get_instruction_type() == StreamVmInstruction::itFLOW_MAN ){
+
+ StreamVmInstructionFlowMan * ins_man=(StreamVmInstructionFlowMan *)inst;
+
+ /* check that instruction is valid */
+ ins_man->sanity_check(ins_id,this);
+
+ VmFlowVarRec var;
+ /* if this is the first time */
+ if ( var_lookup( ins_man->m_var_name,var) == true){
+ std::stringstream ss;
+ ss << "instruction id '" << ins_id << "' flow variable name " << ins_man->m_var_name << " already exists";
+ err(ss.str());
+ }else{
+ var.m_offset=m_cur_var_offset;
+ var.m_ins.m_ins_flowv = ins_man;
+ var.m_size_bytes = ins_man->m_size_bytes;
+ var_add(ins_man->m_var_name,var);
+ m_cur_var_offset += ins_man->m_size_bytes;
+
+ }
+ }
+
+ if ( inst->get_instruction_type() == StreamVmInstruction::itFLOW_CLIENT ){
+ StreamVmInstructionFlowClient * ins_man=(StreamVmInstructionFlowClient *)inst;
+
+ VmFlowVarRec var;
+ /* if this is the first time */
+ if ( var_lookup( ins_man->m_var_name+".ip",var) == true){
+ std::stringstream ss;
+ ss << "instruction id '" << ins_id << "' client variable name " << ins_man->m_var_name << " already exists";
+ err(ss.str());
+ }
+ if ( var_lookup( ins_man->m_var_name+".port",var) == true){
+ std::stringstream ss;
+ ss << "instruction id '" << ins_id << "' client variable name " << ins_man->m_var_name << " already exists";
+ err(ss.str());
+ }
+
+ if ( var_lookup( ins_man->m_var_name+".flow_limit",var) == true){
+ std::stringstream ss;
+ ss << "instruction id '" << ins_id << "' client variable name " << ins_man->m_var_name << " already exists";
+ err(ss.str());
+ }
+
+ var.m_offset = m_cur_var_offset;
+ var.m_ins.m_ins_flow_client = ins_man;
+ var.m_size_bytes =4;
+
+ VmFlowVarRec var_port;
+
+ var_port.m_offset = m_cur_var_offset+4;
+ var_port.m_ins.m_ins_flow_client = ins_man;
+ var_port.m_size_bytes =2;
+
+ VmFlowVarRec var_flow_limit;
+
+ var_flow_limit.m_offset = m_cur_var_offset+6;
+ var_flow_limit.m_ins.m_ins_flow_client = ins_man;
+ var_flow_limit.m_size_bytes =4;
+
+
+ var_add(ins_man->m_var_name+".ip",var);
+ var_add(ins_man->m_var_name+".port",var_port);
+ var_add(ins_man->m_var_name+".flow_limit",var_flow_limit);
+
+ m_cur_var_offset += StreamVmInstructionFlowClient::get_flow_var_size();
+
+ assert(sizeof(StreamDPFlowClient)==StreamVmInstructionFlowClient::get_flow_var_size());
+ }
+
+ /* limit the flow var size */
+ if (m_cur_var_offset > StreamVm::svMAX_FLOW_VAR ) {
+ std::stringstream ss;
+ ss << "too many flow varibles current size is :" << m_cur_var_offset << " maximum support is " << StreamVm::svMAX_FLOW_VAR;
+ err(ss.str());
+ }
+ ins_id++;
+ }
+
+}
+
+void StreamVm::alloc_bss(){
+ free_bss();
+ m_bss=(uint8_t *)malloc(m_cur_var_offset);
+}
+
+void StreamVm::clean_max_field_cnt(){
+ m_max_field_update=0;
+}
+
+void StreamVm::add_field_cnt(uint16_t new_cnt){
+
+ if ( new_cnt > m_max_field_update) {
+ m_max_field_update = new_cnt;
+ }
+}
+
+
+void StreamVm::free_bss(){
+ if (m_bss) {
+ free(m_bss);
+ m_bss=0;
+ }
+}
+
+
+void StreamVm::build_program(){
+
+ /* build the commands into a buffer */
+ m_instructions.clear();
+ clean_max_field_cnt();
+ uint32_t ins_id=0;
+
+ for (auto inst : m_inst_list) {
+ StreamVmInstruction::instruction_type_t ins_type=inst->get_instruction_type();
+
+ /* itFIX_IPV4_CS */
+ if (ins_type == StreamVmInstruction::itFIX_IPV4_CS) {
+ StreamVmInstructionFixChecksumIpv4 *lpFix =(StreamVmInstructionFixChecksumIpv4 *)inst;
+
+ add_field_cnt(lpFix->m_pkt_offset +12);
+
+ if ( (lpFix->m_pkt_offset + IPV4_HDR_LEN) > m_pkt_size ) {
+
+ std::stringstream ss;
+ ss << "instruction id '" << ins_id << "' fix ipv4 command offset " << lpFix->m_pkt_offset << " is too high relative to packet size "<< m_pkt_size;
+ err(ss.str());
+ }
+
+ StreamDPOpIpv4Fix ipv_fix;
+ ipv_fix.m_offset = lpFix->m_pkt_offset;
+ ipv_fix.m_op = StreamDPVmInstructions::ditFIX_IPV4_CS;
+ m_instructions.add_command(&ipv_fix,sizeof(ipv_fix));
+ }
+
+
+ /* flow man */
+ if (ins_type == StreamVmInstruction::itFLOW_MAN) {
+ StreamVmInstructionFlowMan *lpMan =(StreamVmInstructionFlowMan *)inst;
+
+
+ if (lpMan->m_size_bytes == 1 ){
+
+ uint8_t op=StreamDPVmInstructions::ditINC8;
+
+ if ( lpMan->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_INC ){
+ op = StreamDPVmInstructions::ditINC8 ;
+ }
+
+ if ( lpMan->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_DEC ){
+ op = StreamDPVmInstructions::ditDEC8 ;
+ }
+
+ if ( lpMan->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_RANDOM ){
+ op = StreamDPVmInstructions::ditRANDOM8 ;
+ }
+
+ StreamDPOpFlowVar8 fv8;
+ fv8.m_op = op;
+ fv8.m_flow_offset = get_var_offset(lpMan->m_var_name);
+ fv8.m_min_val = (uint8_t)lpMan->m_min_value;
+ fv8.m_max_val = (uint8_t)lpMan->m_max_value;
+ m_instructions.add_command(&fv8,sizeof(fv8));
+ }
+
+ if (lpMan->m_size_bytes == 2 ){
+ uint8_t op;
+
+ op = StreamDPVmInstructions::ditINC16;
+
+ if ( lpMan->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_INC ){
+ op = StreamDPVmInstructions::ditINC16 ;
+ }
+
+ if ( lpMan->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_DEC ){
+ op = StreamDPVmInstructions::ditDEC16 ;
+ }
+
+ if ( lpMan->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_RANDOM ){
+ op = StreamDPVmInstructions::ditRANDOM16 ;
+ }
+
+ StreamDPOpFlowVar16 fv16;
+ fv16.m_op = op;
+ fv16.m_flow_offset = get_var_offset(lpMan->m_var_name);
+ fv16.m_min_val = (uint16_t)lpMan->m_min_value;
+ fv16.m_max_val = (uint16_t)lpMan->m_max_value;
+ m_instructions.add_command(&fv16,sizeof(fv16));
+ }
+
+ if (lpMan->m_size_bytes == 4 ){
+ uint8_t op;
+
+ op = StreamDPVmInstructions::ditINC32;
+
+ if ( lpMan->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_INC ){
+ op = StreamDPVmInstructions::ditINC32 ;
+ }
+
+ if ( lpMan->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_DEC ){
+ op = StreamDPVmInstructions::ditDEC32 ;
+ }
+
+ if ( lpMan->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_RANDOM ){
+ op = StreamDPVmInstructions::ditRANDOM32 ;
+ }
+
+ StreamDPOpFlowVar32 fv32;
+ fv32.m_op = op;
+ fv32.m_flow_offset = get_var_offset(lpMan->m_var_name);
+ fv32.m_min_val = (uint32_t)lpMan->m_min_value;
+ fv32.m_max_val = (uint32_t)lpMan->m_max_value;
+ m_instructions.add_command(&fv32,sizeof(fv32));
+ }
+
+ if (lpMan->m_size_bytes == 8 ){
+ uint8_t op;
+
+ op = StreamDPVmInstructions::ditINC64;
+
+ if ( lpMan->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_INC ){
+ op = StreamDPVmInstructions::ditINC64 ;
+ }
+
+ if ( lpMan->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_DEC ){
+ op = StreamDPVmInstructions::ditDEC64 ;
+ }
+
+ if ( lpMan->m_op == StreamVmInstructionFlowMan::FLOW_VAR_OP_RANDOM ){
+ op = StreamDPVmInstructions::ditRANDOM64 ;
+ }
+
+ StreamDPOpFlowVar32 fv64;
+ fv64.m_op = op;
+ fv64.m_flow_offset = get_var_offset(lpMan->m_var_name);
+ fv64.m_min_val = (uint64_t)lpMan->m_min_value;
+ fv64.m_max_val = (uint64_t)lpMan->m_max_value;
+ m_instructions.add_command(&fv64,sizeof(fv64));
+ }
+ }
+
+ if (ins_type == StreamVmInstruction::itPKT_WR) {
+ StreamVmInstructionWriteToPkt *lpPkt =(StreamVmInstructionWriteToPkt *)inst;
+
+ VmFlowVarRec var;
+ if ( var_lookup(lpPkt->m_flow_var_name ,var) == false){
+
+ std::stringstream ss;
+ ss << "instruction id '" << ins_id << "' packet write with no valid flow varible name '" << lpPkt->m_flow_var_name << "'" ;
+ err(ss.str());
+ }
+
+ if (lpPkt->m_pkt_offset + var.m_size_bytes > m_pkt_size ) {
+ std::stringstream ss;
+ ss << "instruction id '" << ins_id << "' packet write with packet_offset " << lpPkt->m_pkt_offset + var.m_size_bytes << " bigger than packet size "<< m_pkt_size;
+ err(ss.str());
+ }
+ add_field_cnt(lpPkt->m_pkt_offset + var.m_size_bytes);
+
+
+ uint8_t op_size=var.m_size_bytes;
+ bool is_big = lpPkt->m_is_big_endian;
+ uint8_t flags = (is_big?StreamDPOpPktWrBase::PKT_WR_IS_BIG:0);
+ uint8_t flow_offset = get_var_offset(lpPkt->m_flow_var_name);
+
+ if (op_size == 1) {
+ StreamDPOpPktWr8 pw8;
+ pw8.m_op = StreamDPVmInstructions::itPKT_WR8;
+ pw8.m_flags =flags;
+ pw8.m_offset =flow_offset;
+ pw8.m_pkt_offset = lpPkt->m_pkt_offset;
+ pw8.m_val_offset = (int8_t)lpPkt->m_add_value;
+ m_instructions.add_command(&pw8,sizeof(pw8));
+ }
+
+ if (op_size == 2) {
+ StreamDPOpPktWr16 pw16;
+ pw16.m_op = StreamDPVmInstructions::itPKT_WR16;
+ pw16.m_flags =flags;
+ pw16.m_offset =flow_offset;
+ pw16.m_pkt_offset = lpPkt->m_pkt_offset;
+ pw16.m_val_offset = (int16_t)lpPkt->m_add_value;
+ m_instructions.add_command(&pw16,sizeof(pw16));
+ }
+
+ if (op_size == 4) {
+ StreamDPOpPktWr32 pw32;
+ pw32.m_op = StreamDPVmInstructions::itPKT_WR32;
+ pw32.m_flags =flags;
+ pw32.m_offset =flow_offset;
+ pw32.m_pkt_offset = lpPkt->m_pkt_offset;
+ pw32.m_val_offset = (int32_t)lpPkt->m_add_value;
+ m_instructions.add_command(&pw32,sizeof(pw32));
+ }
+
+ if (op_size == 8) {
+ StreamDPOpPktWr64 pw64;
+ pw64.m_op = StreamDPVmInstructions::itPKT_WR64;
+ pw64.m_flags =flags;
+ pw64.m_offset =flow_offset;
+ pw64.m_pkt_offset = lpPkt->m_pkt_offset;
+ pw64.m_val_offset = (int64_t)lpPkt->m_add_value;
+ m_instructions.add_command(&pw64,sizeof(pw64));
+ }
+
+ }
+
+
+ if (ins_type == StreamVmInstruction::itFLOW_CLIENT) {
+ StreamVmInstructionFlowClient *lpMan =(StreamVmInstructionFlowClient *)inst;
+
+ if ( lpMan->is_unlimited_flows() ){
+ StreamDPOpClientsUnLimit client_cmd;
+ client_cmd.m_op = StreamDPVmInstructions::itCLIENT_VAR_UNLIMIT;
+
+ client_cmd.m_flow_offset = get_var_offset(lpMan->m_var_name+".ip"); /* start offset */
+ client_cmd.m_flags = 0; /* not used */
+ client_cmd.m_pad = 0;
+ client_cmd.m_min_ip = lpMan->m_client_min;
+ client_cmd.m_max_ip = lpMan->m_client_max;
+ m_instructions.add_command(&client_cmd,sizeof(client_cmd));
+
+ }else{
+ StreamDPOpClientsLimit client_cmd;
+ client_cmd.m_op = StreamDPVmInstructions::itCLIENT_VAR;
+
+ client_cmd.m_flow_offset = get_var_offset(lpMan->m_var_name+".ip"); /* start offset */
+ client_cmd.m_flags = 0; /* not used */
+ client_cmd.m_pad = 0;
+ client_cmd.m_min_port = lpMan->m_port_min;
+ client_cmd.m_max_port = lpMan->m_port_max;
+ client_cmd.m_min_ip = lpMan->m_client_min;
+ client_cmd.m_max_ip = lpMan->m_client_max;
+ client_cmd.m_limit_flows = lpMan->m_limit_num_flows;
+ m_instructions.add_command(&client_cmd,sizeof(client_cmd));
+ }
+ }
+
+ ins_id++;
+ }
+}
+
+
+void StreamVm::build_bss() {
+ alloc_bss();
+ uint8_t * p=(uint8_t *)m_bss;
+
+ for (auto inst : m_inst_list) {
+
+ if ( inst->get_instruction_type() == StreamVmInstruction::itFLOW_MAN ){
+
+ StreamVmInstructionFlowMan * ins_man=(StreamVmInstructionFlowMan *)inst;
+
+ switch (ins_man->m_size_bytes) {
+ case 1:
+ *p=(uint8_t)ins_man->m_init_value;
+ p+=1;
+ break;
+ case 2:
+ *((uint16_t*)p)=(uint16_t)ins_man->m_init_value;
+ p+=2;
+ break;
+ case 4:
+ *((uint32_t*)p)=(uint32_t)ins_man->m_init_value;
+ p+=4;
+ break;
+ case 8:
+ *((uint64_t*)p)=(uint64_t)ins_man->m_init_value;
+ p+=8;
+ break;
+ default:
+ assert(0);
+ }
+ }
+
+ if ( inst->get_instruction_type() == StreamVmInstruction::itFLOW_CLIENT ){
+
+ StreamVmInstructionFlowClient * ins_man=(StreamVmInstructionFlowClient *)inst;
+ if (ins_man->m_client_min>0) {
+ *((uint32_t*)p)=(uint32_t)(ins_man->m_client_min-1);
+ }else{
+ *((uint32_t*)p)=(uint32_t)ins_man->m_client_min;
+ }
+ p+=4;
+
+ if (ins_man->is_unlimited_flows() ) {
+ *((uint16_t*)p)=StreamDPOpClientsUnLimit::CLIENT_UNLIMITED_MIN_PORT;
+ }else{
+ *((uint16_t*)p)=(uint16_t)ins_man->m_port_min;
+ }
+ p+=2;
+
+ *((uint32_t*)p)=0;
+ p+=4;
+ }
+
+ }
+}
+
+
+
+void StreamVm::compile() {
+
+ /* build flow var offset table */
+ build_flow_var_table() ;
+
+ /* build init flow var memory */
+ build_bss();
+
+ build_program();
+
+ if ( get_max_packet_update_offset() >svMAX_PACKET_OFFSET_CHANGE ){
+ std::stringstream ss;
+ ss << "maximum offset is" << get_max_packet_update_offset() << " bigger than maximum " <<svMAX_PACKET_OFFSET_CHANGE;
+ err(ss.str());
+ }
}
+
StreamVm::~StreamVm() {
for (auto inst : m_inst_list) {
delete inst;
+ }
+ free_bss();
+}
+
+
+void StreamVm::Dump(FILE *fd){
+ fprintf(fd," instructions \n");
+ uint32_t cnt=0;
+ for (auto inst : m_inst_list) {
+ fprintf(fd," [%04lu] : ",(ulong)cnt);
+ inst->Dump(fd);
+ cnt++;
}
+
+ if ( get_bss_size() ) {
+ fprintf(fd," BSS size %lu\n",(ulong)get_bss_size());
+ utl_DumpBuffer(fd,get_bss_ptr(),get_bss_size(),0);
+ }
+
+ if ( m_instructions.get_program_size() > 0 ){
+ fprintf(fd," RAW instructions \n");
+ m_instructions.Dump(fd);
+ }
+}
+
+
+void StreamDPVmInstructions::clear(){
+ m_inst_list.clear();
+}
+
+
+void StreamDPVmInstructions::add_command(void *buffer,uint16_t size){
+ int i;
+ uint8_t *p= (uint8_t *)buffer;
+ /* push byte by byte */
+ for (i=0; i<size; i++) {
+ m_inst_list.push_back(*p);
+ p++;
+ }
+}
+
+uint8_t * StreamDPVmInstructions::get_program(){
+ return (&m_inst_list[0]);
+}
+
+uint32_t StreamDPVmInstructions::get_program_size(){
+ return (m_inst_list.size());
+}
+
+void StreamDPVmInstructions::Dump(FILE *fd){
+
+ uint8_t * p=get_program();
+
+
+ uint32_t program_size = get_program_size();
+ uint8_t * p_end=p+program_size;
+
+ StreamDPOpFlowVar8 *lpv8;
+ StreamDPOpFlowVar16 *lpv16;
+ StreamDPOpFlowVar32 *lpv32;
+ StreamDPOpFlowVar64 *lpv64;
+ StreamDPOpIpv4Fix *lpIpv4Fix;
+ StreamDPOpPktWr8 *lpw8;
+ StreamDPOpPktWr16 *lpw16;
+ StreamDPOpPktWr32 *lpw32;
+ StreamDPOpPktWr64 *lpw64;
+ StreamDPOpClientsLimit *lp_client;
+ StreamDPOpClientsUnLimit *lp_client_unlimited;
+
+
+ while ( p < p_end) {
+ uint8_t op_code=*p;
+ switch (op_code) {
+
+ case ditINC8 :
+ lpv8 =(StreamDPOpFlowVar8 *)p;
+ lpv8->dump(fd,"INC8");
+ p+=sizeof(StreamDPOpFlowVar8);
+ break;
+ case ditINC16 :
+ lpv16 =(StreamDPOpFlowVar16 *)p;
+ lpv16->dump(fd,"INC16");
+ p+=sizeof(StreamDPOpFlowVar16);
+ break;
+ case ditINC32 :
+ lpv32 =(StreamDPOpFlowVar32 *)p;
+ lpv32->dump(fd,"INC32");
+ p+=sizeof(StreamDPOpFlowVar32);
+ break;
+ case ditINC64 :
+ lpv64 =(StreamDPOpFlowVar64 *)p;
+ lpv64->dump(fd,"INC64");
+ p+=sizeof(StreamDPOpFlowVar64);
+ break;
+
+ case ditDEC8 :
+ lpv8 =(StreamDPOpFlowVar8 *)p;
+ lpv8->dump(fd,"DEC8");
+ p+=sizeof(StreamDPOpFlowVar8);
+ break;
+ case ditDEC16 :
+ lpv16 =(StreamDPOpFlowVar16 *)p;
+ lpv16->dump(fd,"DEC16");
+ p+=sizeof(StreamDPOpFlowVar16);
+ break;
+ case ditDEC32 :
+ lpv32 =(StreamDPOpFlowVar32 *)p;
+ lpv32->dump(fd,"DEC32");
+ p+=sizeof(StreamDPOpFlowVar32);
+ break;
+ case ditDEC64 :
+ lpv64 =(StreamDPOpFlowVar64 *)p;
+ lpv64->dump(fd,"DEC64");
+ p+=sizeof(StreamDPOpFlowVar64);
+ break;
+
+ case ditRANDOM8 :
+ lpv8 =(StreamDPOpFlowVar8 *)p;
+ lpv8->dump(fd,"RAND8");
+ p+=sizeof(StreamDPOpFlowVar8);
+ break;
+ case ditRANDOM16 :
+ lpv16 =(StreamDPOpFlowVar16 *)p;
+ lpv16->dump(fd,"RAND16");
+ p+=sizeof(StreamDPOpFlowVar16);
+ break;
+ case ditRANDOM32 :
+ lpv32 =(StreamDPOpFlowVar32 *)p;
+ lpv32->dump(fd,"RAND32");
+ p+=sizeof(StreamDPOpFlowVar32);
+ break;
+ case ditRANDOM64 :
+ lpv64 =(StreamDPOpFlowVar64 *)p;
+ lpv64->dump(fd,"RAND64");
+ p+=sizeof(StreamDPOpFlowVar64);
+ break;
+
+ case ditFIX_IPV4_CS :
+ lpIpv4Fix =(StreamDPOpIpv4Fix *)p;
+ lpIpv4Fix->dump(fd,"Ipv4Fix");
+ p+=sizeof(StreamDPOpIpv4Fix);
+ break;
+
+ case itPKT_WR8 :
+ lpw8 =(StreamDPOpPktWr8 *)p;
+ lpw8->dump(fd,"Wr8");
+ p+=sizeof(StreamDPOpPktWr8);
+ break;
+
+ case itPKT_WR16 :
+ lpw16 =(StreamDPOpPktWr16 *)p;
+ lpw16->dump(fd,"Wr16");
+ p+=sizeof(StreamDPOpPktWr16);
+ break;
+
+ case itPKT_WR32 :
+ lpw32 =(StreamDPOpPktWr32 *)p;
+ lpw32->dump(fd,"Wr32");
+ p+=sizeof(StreamDPOpPktWr32);
+ break;
+
+ case itPKT_WR64 :
+ lpw64 =(StreamDPOpPktWr64 *)p;
+ lpw64->dump(fd,"Wr64");
+ p+=sizeof(StreamDPOpPktWr64);
+ break;
+
+ case itCLIENT_VAR :
+ lp_client =(StreamDPOpClientsLimit *)p;
+ lp_client->dump(fd,"Client");
+ p+=sizeof(StreamDPOpClientsLimit);
+ break;
+
+ case itCLIENT_VAR_UNLIMIT :
+ lp_client_unlimited =(StreamDPOpClientsUnLimit *)p;
+ lp_client_unlimited->dump(fd,"ClientUnlimted");
+ p+=sizeof(StreamDPOpClientsUnLimit);
+ break;
+
+
+ default:
+ assert(0);
+ }
+ };
+}
+
+
+void StreamDPOpFlowVar8::dump(FILE *fd,std::string opt){
+ fprintf(fd," %10s op:%lu, of:%lu, (%lu- %lu) \n", opt.c_str(),(ulong)m_op,(ulong)m_flow_offset,(ulong)m_min_val,(ulong)m_max_val);
+}
+
+void StreamDPOpFlowVar16::dump(FILE *fd,std::string opt){
+ fprintf(fd," %10s op:%lu, of:%lu, (%lu-%lu) \n", opt.c_str(),(ulong)m_op,(ulong)m_flow_offset,(ulong)m_min_val,(ulong)m_max_val);
+}
+
+void StreamDPOpFlowVar32::dump(FILE *fd,std::string opt){
+ fprintf(fd," %10s op:%lu, of:%lu, (%lu-%lu) \n", opt.c_str(),(ulong)m_op,(ulong)m_flow_offset,(ulong)m_min_val,(ulong)m_max_val);
+}
+
+void StreamDPOpFlowVar64::dump(FILE *fd,std::string opt){
+ fprintf(fd," %10s op:%lu, of:%lu, (%lu-%lu) \n", opt.c_str(),(ulong)m_op,(ulong)m_flow_offset,(ulong)m_min_val,(ulong)m_max_val);
+}
+
+void StreamDPOpPktWr8::dump(FILE *fd,std::string opt){
+ fprintf(fd," %10s op:%lu, flags:%lu, pkt_of:%lu, f_of:%lu \n", opt.c_str(),(ulong)m_op,(ulong)m_flags,(ulong)m_pkt_offset,(ulong)m_offset);
}
+void StreamDPOpPktWr16::dump(FILE *fd,std::string opt){
+ fprintf(fd," %10s op:%lu, flags:%lu, pkt_of:%lu , f_of:%lu \n", opt.c_str(),(ulong)m_op,(ulong)m_flags,(ulong)m_pkt_offset,(ulong)m_offset);
+}
+
+void StreamDPOpPktWr32::dump(FILE *fd,std::string opt){
+ fprintf(fd," %10s op:%lu, flags:%lu, pkt_of:%lu , f_of:%lu \n", opt.c_str(),(ulong)m_op,(ulong)m_flags,(ulong)m_pkt_offset,(ulong)m_offset);
+}
+
+void StreamDPOpPktWr64::dump(FILE *fd,std::string opt){
+ fprintf(fd," %10s op:%lu, flags:%lu, pkt_of:%lu , f_of:%lu \n", opt.c_str(),(ulong)m_op,(ulong)m_flags,(ulong)m_pkt_offset,(ulong)m_offset);
+}
+
+
+void StreamDPOpIpv4Fix::dump(FILE *fd,std::string opt){
+ fprintf(fd," %10s op:%lu, offset: %lu \n", opt.c_str(),(ulong)m_op,(ulong)m_offset);
+}
+
+
+void StreamDPOpClientsLimit::dump(FILE *fd,std::string opt){
+ fprintf(fd," %10s op:%lu, flow_offset: %lu (%x-%x) (%x-%x) flow_limit :%lu flags:%x \n", opt.c_str(),(ulong)m_op,(ulong)m_flow_offset,m_min_ip,m_max_ip,m_min_port,m_max_port,(ulong)m_limit_flows,m_flags);
+}
+
+void StreamDPOpClientsUnLimit::dump(FILE *fd,std::string opt){
+ fprintf(fd," %10s op:%lu, flow_offset: %lu (%x-%x) flags:%x \n", opt.c_str(),(ulong)m_op,(ulong)m_flow_offset,m_min_ip,m_max_ip,m_flags);
+}
+
+
diff --git a/src/stateless/cp/trex_stream_vm.h b/src/stateless/cp/trex_stream_vm.h
index 56edbcaf..e65a87e3 100644
--- a/src/stateless/cp/trex_stream_vm.h
+++ b/src/stateless/cp/trex_stream_vm.h
@@ -24,6 +24,526 @@ limitations under the License.
#include <string>
#include <stdint.h>
#include <vector>
+#include <unordered_map>
+#include <assert.h>
+#include <common/Network/Packet/IPHeader.h>
+#include "pal_utl.h"
+#include "mbuf.h"
+
+
+
+
+class StreamVm;
+
+
+/* in memory program */
+
+struct StreamDPOpFlowVar8 {
+ uint8_t m_op;
+ uint8_t m_flow_offset;
+ uint8_t m_min_val;
+ uint8_t m_max_val;
+public:
+ void dump(FILE *fd,std::string opt);
+
+ inline void run_inc(uint8_t * flow_var) {
+ uint8_t * p=(flow_var+m_flow_offset);
+ *p=*p+1;
+ if (*p>m_max_val) {
+ *p=m_min_val;
+ }
+ }
+
+ inline void run_dec(uint8_t * flow_var) {
+ uint8_t * p=(flow_var+m_flow_offset);
+ *p=*p-1;
+ if (*p<m_min_val) {
+ *p=m_max_val;
+ }
+ }
+
+ inline void run_rand(uint8_t * flow_var) {
+ uint8_t * p=(flow_var+m_flow_offset);
+ *p= m_min_val + (rand() % (int)(m_max_val - m_min_val + 1));
+ }
+
+
+} __attribute__((packed)) ;
+
+struct StreamDPOpFlowVar16 {
+ uint8_t m_op;
+ uint8_t m_flow_offset;
+ uint16_t m_min_val;
+ uint16_t m_max_val;
+public:
+ void dump(FILE *fd,std::string opt);
+
+ inline void run_inc(uint8_t * flow_var) {
+ uint16_t * p=(uint16_t *)(flow_var+m_flow_offset);
+ *p=*p+1;
+ if (*p>m_max_val) {
+ *p=m_min_val;
+ }
+ }
+
+ inline void run_dec(uint8_t * flow_var) {
+ uint16_t * p=(uint16_t *)(flow_var+m_flow_offset);
+ *p=*p-1;
+ if (*p<m_min_val) {
+ *p=m_max_val;
+ }
+ }
+
+ inline void run_rand(uint8_t * flow_var) {
+ uint16_t * p=(uint16_t *)(flow_var+m_flow_offset);
+ *p= m_min_val + (rand() % (int)(m_max_val - m_min_val + 1));
+ }
+
+
+
+} __attribute__((packed)) ;
+
+struct StreamDPOpFlowVar32 {
+ uint8_t m_op;
+ uint8_t m_flow_offset;
+ uint32_t m_min_val;
+ uint32_t m_max_val;
+public:
+ void dump(FILE *fd,std::string opt);
+
+ inline void run_inc(uint8_t * flow_var) {
+ uint32_t * p=(uint32_t *)(flow_var+m_flow_offset);
+ *p=*p+1;
+ if (*p>m_max_val) {
+ *p=m_min_val;
+ }
+ }
+
+ inline void run_dec(uint8_t * flow_var) {
+ uint32_t * p=(uint32_t *)(flow_var+m_flow_offset);
+ *p=*p-1;
+ if (*p<m_min_val) {
+ *p=m_max_val;
+ }
+ }
+
+ inline void run_rand(uint8_t * flow_var) {
+ uint32_t * p=(uint32_t *)(flow_var+m_flow_offset);
+ *p= m_min_val + (rand() % (int)(m_max_val - m_min_val + 1));
+ }
+
+} __attribute__((packed)) ;
+
+struct StreamDPOpFlowVar64 {
+ uint8_t m_op;
+ uint8_t m_flow_offset;
+ uint64_t m_min_val;
+ uint64_t m_max_val;
+public:
+ void dump(FILE *fd,std::string opt);
+
+ inline void run_inc(uint8_t * flow_var) {
+ uint64_t * p=(uint64_t *)(flow_var+m_flow_offset);
+ *p=*p+1;
+ if (*p>m_max_val) {
+ *p=m_min_val;
+ }
+ }
+
+ inline void run_dec(uint8_t * flow_var) {
+ uint64_t * p=(uint64_t *)(flow_var+m_flow_offset);
+ *p=*p-1;
+ if (*p<m_min_val) {
+ *p=m_max_val;
+ }
+ }
+
+ inline void run_rand(uint8_t * flow_var) {
+ uint64_t * p=(uint64_t *)(flow_var+m_flow_offset);
+ *p= m_min_val + (rand() % (int)(m_max_val - m_min_val + 1));
+ }
+
+
+} __attribute__((packed)) ;
+
+
+struct StreamDPOpPktWrBase {
+ enum {
+ PKT_WR_IS_BIG = 1
+ }; /* for flags */
+
+ uint8_t m_op;
+ uint8_t m_flags;
+ uint8_t m_offset;
+
+public:
+ bool is_big(){
+ return ( (m_flags &StreamDPOpPktWrBase::PKT_WR_IS_BIG) == StreamDPOpPktWrBase::PKT_WR_IS_BIG ?true:false);
+ }
+
+} __attribute__((packed)) ;
+
+
+struct StreamDPOpPktWr8 : public StreamDPOpPktWrBase {
+ int8_t m_val_offset;
+ uint16_t m_pkt_offset;
+
+public:
+ void dump(FILE *fd,std::string opt);
+
+ inline void wr(uint8_t * flow_var_base,uint8_t * pkt_base) {
+ uint8_t * p_pkt = (pkt_base+m_pkt_offset);
+ uint8_t * p_flow_var = (flow_var_base+m_offset);
+ *p_pkt=(*p_flow_var+m_val_offset);
+
+ }
+
+
+} __attribute__((packed)) ;
+
+
+struct StreamDPOpPktWr16 : public StreamDPOpPktWrBase {
+ uint16_t m_pkt_offset;
+ int16_t m_val_offset;
+public:
+ void dump(FILE *fd,std::string opt);
+
+ inline void wr(uint8_t * flow_var_base,uint8_t * pkt_base) {
+ uint16_t * p_pkt = (uint16_t*)(pkt_base+m_pkt_offset);
+ uint16_t * p_flow_var = (uint16_t*)(flow_var_base+m_offset);
+
+ if ( likely(is_big())){
+ *p_pkt=PKT_HTONS((*p_flow_var+m_val_offset));
+ }else{
+ *p_pkt=(*p_flow_var+m_val_offset);
+ }
+
+ }
+} __attribute__((packed));
+
+struct StreamDPOpPktWr32 : public StreamDPOpPktWrBase {
+ uint16_t m_pkt_offset;
+ int32_t m_val_offset;
+public:
+ void dump(FILE *fd,std::string opt);
+
+ inline void wr(uint8_t * flow_var_base,uint8_t * pkt_base) {
+ uint32_t * p_pkt = (uint32_t*)(pkt_base+m_pkt_offset);
+ uint32_t * p_flow_var = (uint32_t*)(flow_var_base+m_offset);
+ if ( likely(is_big())){
+ *p_pkt=PKT_HTONL((*p_flow_var+m_val_offset));
+ }else{
+ *p_pkt=(*p_flow_var+m_val_offset);
+ }
+ }
+
+} __attribute__((packed));
+
+struct StreamDPOpPktWr64 : public StreamDPOpPktWrBase {
+ uint16_t m_pkt_offset;
+ int64_t m_val_offset;
+
+public:
+ void dump(FILE *fd,std::string opt);
+
+ inline void wr(uint8_t * flow_var_base,uint8_t * pkt_base) {
+ uint64_t * p_pkt = (uint64_t*)(pkt_base+m_pkt_offset);
+ uint64_t * p_flow_var = (uint64_t*)(flow_var_base+m_offset);
+ if ( likely(is_big())){
+ *p_pkt=pal_ntohl64((*p_flow_var+m_val_offset));
+ }else{
+ *p_pkt=(*p_flow_var+m_val_offset);
+ }
+ }
+
+
+} __attribute__((packed));
+
+struct StreamDPOpIpv4Fix {
+ uint8_t m_op;
+ uint16_t m_offset;
+public:
+ void dump(FILE *fd,std::string opt);
+ void run(uint8_t * pkt_base){
+ IPHeader * ipv4= (IPHeader *)(pkt_base+m_offset);
+ ipv4->updateCheckSum();
+ }
+
+} __attribute__((packed));
+
+
+/* flow varible of Client command */
+struct StreamDPFlowClient {
+ uint32_t cur_ip;
+ uint16_t cur_port;
+ uint32_t cur_flow_id;
+} __attribute__((packed));
+
+
+struct StreamDPOpClientsLimit {
+ uint8_t m_op;
+ uint8_t m_flow_offset; /* offset into the flow var bytes */
+ uint8_t m_flags;
+ uint8_t m_pad;
+ uint16_t m_min_port;
+ uint16_t m_max_port;
+
+ uint32_t m_min_ip;
+ uint32_t m_max_ip;
+ uint32_t m_limit_flows; /* limit the number of flows */
+
+public:
+ void dump(FILE *fd,std::string opt);
+ inline void run(uint8_t * flow_var_base) {
+ StreamDPFlowClient * lp= (StreamDPFlowClient *)(flow_var_base+m_flow_offset);
+ lp->cur_ip++;
+ if (lp->cur_ip > m_max_ip ) {
+ lp->cur_ip= m_min_ip;
+ lp->cur_port++;
+ if (lp->cur_port > m_max_port) {
+ lp->cur_port = m_min_port;
+ }
+ }
+
+ if (m_limit_flows) {
+ lp->cur_flow_id++;
+ if ( lp->cur_flow_id > m_limit_flows ){
+ /* reset to the first flow */
+ lp->cur_flow_id = 1;
+ lp->cur_ip = m_min_ip;
+ lp->cur_port = m_min_port;
+ }
+ }
+ }
+
+
+} __attribute__((packed));
+
+struct StreamDPOpClientsUnLimit {
+ enum __MIN_PORT {
+ CLIENT_UNLIMITED_MIN_PORT = 1025
+ };
+
+ uint8_t m_op;
+ uint8_t m_flow_offset; /* offset into the flow var bytes */
+ uint8_t m_flags;
+ uint8_t m_pad;
+ uint32_t m_min_ip;
+ uint32_t m_max_ip;
+
+public:
+ void dump(FILE *fd,std::string opt);
+ inline void run(uint8_t * flow_var_base) {
+ StreamDPFlowClient * lp= (StreamDPFlowClient *)(flow_var_base+m_flow_offset);
+ lp->cur_ip++;
+ if (lp->cur_ip > m_max_ip ) {
+ lp->cur_ip= m_min_ip;
+ lp->cur_port++;
+ if (lp->cur_port == 0) {
+ lp->cur_port = StreamDPOpClientsUnLimit::CLIENT_UNLIMITED_MIN_PORT;
+ }
+ }
+ }
+
+} __attribute__((packed));
+
+
+/* datapath instructions */
+class StreamDPVmInstructions {
+public:
+ enum INS_TYPE {
+ ditINC8 ,
+ ditINC16 ,
+ ditINC32 ,
+ ditINC64 ,
+
+ ditDEC8 ,
+ ditDEC16 ,
+ ditDEC32 ,
+ ditDEC64 ,
+
+ ditRANDOM8 ,
+ ditRANDOM16 ,
+ ditRANDOM32 ,
+ ditRANDOM64 ,
+
+ ditFIX_IPV4_CS ,
+
+ itPKT_WR8 ,
+ itPKT_WR16 ,
+ itPKT_WR32 ,
+ itPKT_WR64 ,
+ itCLIENT_VAR ,
+ itCLIENT_VAR_UNLIMIT
+ };
+
+
+public:
+ void clear();
+ void add_command(void *buffer,uint16_t size);
+ uint8_t * get_program();
+ uint32_t get_program_size();
+
+
+ void Dump(FILE *fd);
+
+
+private:
+ std::vector<uint8_t> m_inst_list;
+};
+
+
+class StreamDPVmInstructionsRunner {
+public:
+ inline void run(uint32_t program_size,
+ uint8_t * program, /* program */
+ uint8_t * flow_var, /* flow var */
+ uint8_t * pkt); /* pkt */
+
+};
+
+
+inline void StreamDPVmInstructionsRunner::run(uint32_t program_size,
+ uint8_t * program, /* program */
+ uint8_t * flow_var, /* flow var */
+ uint8_t * pkt){
+
+
+ uint8_t * p=program;
+ uint8_t * p_end=p+program_size;
+
+
+ union ua_ {
+ StreamDPOpFlowVar8 *lpv8;
+ StreamDPOpFlowVar16 *lpv16;
+ StreamDPOpFlowVar32 *lpv32;
+ StreamDPOpFlowVar64 *lpv64;
+ StreamDPOpIpv4Fix *lpIpv4Fix;
+ StreamDPOpPktWr8 *lpw8;
+ StreamDPOpPktWr16 *lpw16;
+ StreamDPOpPktWr32 *lpw32;
+ StreamDPOpPktWr64 *lpw64;
+ StreamDPOpClientsLimit *lpcl;
+ StreamDPOpClientsUnLimit *lpclu;
+ } ua ;
+
+ while ( p < p_end) {
+ uint8_t op_code=*p;
+ switch (op_code) {
+
+ case StreamDPVmInstructions::itCLIENT_VAR :
+ ua.lpcl =(StreamDPOpClientsLimit *)p;
+ ua.lpcl->run(flow_var);
+ p+=sizeof(StreamDPOpClientsLimit);
+ break;
+
+ case StreamDPVmInstructions::itCLIENT_VAR_UNLIMIT :
+ ua.lpclu =(StreamDPOpClientsUnLimit *)p;
+ ua.lpclu->run(flow_var);
+ p+=sizeof(StreamDPOpClientsUnLimit);
+ break;
+
+ case StreamDPVmInstructions::ditINC8 :
+ ua.lpv8 =(StreamDPOpFlowVar8 *)p;
+ ua.lpv8->run_inc(flow_var);
+ p+=sizeof(StreamDPOpFlowVar8);
+ break;
+
+ case StreamDPVmInstructions::ditINC16 :
+ ua.lpv16 =(StreamDPOpFlowVar16 *)p;
+ ua.lpv16->run_inc(flow_var);
+ p+=sizeof(StreamDPOpFlowVar16);
+ break;
+ case StreamDPVmInstructions::ditINC32 :
+ ua.lpv32 =(StreamDPOpFlowVar32 *)p;
+ ua.lpv32->run_inc(flow_var);
+ p+=sizeof(StreamDPOpFlowVar32);
+ break;
+ case StreamDPVmInstructions::ditINC64 :
+ ua.lpv64 =(StreamDPOpFlowVar64 *)p;
+ ua.lpv64->run_inc(flow_var);
+ p+=sizeof(StreamDPOpFlowVar64);
+ break;
+
+ case StreamDPVmInstructions::ditDEC8 :
+ ua.lpv8 =(StreamDPOpFlowVar8 *)p;
+ ua.lpv8->run_dec(flow_var);
+ p+=sizeof(StreamDPOpFlowVar8);
+ break;
+ case StreamDPVmInstructions::ditDEC16 :
+ ua.lpv16 =(StreamDPOpFlowVar16 *)p;
+ ua.lpv16->run_dec(flow_var);
+ p+=sizeof(StreamDPOpFlowVar16);
+ break;
+ case StreamDPVmInstructions::ditDEC32 :
+ ua.lpv32 =(StreamDPOpFlowVar32 *)p;
+ ua.lpv32->run_dec(flow_var);
+ p+=sizeof(StreamDPOpFlowVar32);
+ break;
+ case StreamDPVmInstructions::ditDEC64 :
+ ua.lpv64 =(StreamDPOpFlowVar64 *)p;
+ ua.lpv64->run_dec(flow_var);
+ p+=sizeof(StreamDPOpFlowVar64);
+ break;
+
+ case StreamDPVmInstructions::ditRANDOM8 :
+ ua.lpv8 =(StreamDPOpFlowVar8 *)p;
+ ua.lpv8->run_rand(flow_var);
+ p+=sizeof(StreamDPOpFlowVar8);
+ break;
+ case StreamDPVmInstructions::ditRANDOM16 :
+ ua.lpv16 =(StreamDPOpFlowVar16 *)p;
+ ua.lpv16->run_rand(flow_var);
+ p+=sizeof(StreamDPOpFlowVar16);
+ break;
+ case StreamDPVmInstructions::ditRANDOM32 :
+ ua.lpv32 =(StreamDPOpFlowVar32 *)p;
+ ua.lpv32->run_rand(flow_var);
+ p+=sizeof(StreamDPOpFlowVar32);
+ break;
+ case StreamDPVmInstructions::ditRANDOM64 :
+ ua.lpv64 =(StreamDPOpFlowVar64 *)p;
+ ua.lpv64->run_rand(flow_var);
+ p+=sizeof(StreamDPOpFlowVar64);
+ break;
+
+ case StreamDPVmInstructions::ditFIX_IPV4_CS :
+ ua.lpIpv4Fix =(StreamDPOpIpv4Fix *)p;
+ ua.lpIpv4Fix->run(pkt);
+ p+=sizeof(StreamDPOpIpv4Fix);
+ break;
+
+ case StreamDPVmInstructions::itPKT_WR8 :
+ ua.lpw8 =(StreamDPOpPktWr8 *)p;
+ ua.lpw8->wr(flow_var,pkt);
+ p+=sizeof(StreamDPOpPktWr8);
+ break;
+
+ case StreamDPVmInstructions::itPKT_WR16 :
+ ua.lpw16 =(StreamDPOpPktWr16 *)p;
+ ua.lpw16->wr(flow_var,pkt);
+ p+=sizeof(StreamDPOpPktWr16);
+ break;
+
+ case StreamDPVmInstructions::itPKT_WR32 :
+ ua.lpw32 =(StreamDPOpPktWr32 *)p;
+ ua.lpw32->wr(flow_var,pkt);
+ p+=sizeof(StreamDPOpPktWr32);
+ break;
+
+ case StreamDPVmInstructions::itPKT_WR64 :
+ ua.lpw64 =(StreamDPOpPktWr64 *)p;
+ ua.lpw64->wr(flow_var,pkt);
+ p+=sizeof(StreamDPOpPktWr64);
+ break;
+ default:
+ assert(0);
+ }
+ };
+};
+
+
+
/**
* interface for stream VM instruction
@@ -32,8 +552,24 @@ limitations under the License.
class StreamVmInstruction {
public:
+ enum INS_TYPE {
+ itNONE = 0,
+ itFIX_IPV4_CS = 4,
+ itFLOW_MAN = 5,
+ itPKT_WR = 6,
+ itFLOW_CLIENT = 7
+
+ };
+
+ typedef uint8_t instruction_type_t ;
+
+ virtual instruction_type_t get_instruction_type()=0;
+
virtual ~StreamVmInstruction();
+ virtual void Dump(FILE *fd)=0;
+
+
private:
static const std::string m_name;
};
@@ -48,8 +584,14 @@ public:
}
-private:
- uint16_t m_pkt_offset;
+ virtual instruction_type_t get_instruction_type(){
+ return ( StreamVmInstruction::itFIX_IPV4_CS);
+ }
+
+ virtual void Dump(FILE *fd);
+
+public:
+ uint16_t m_pkt_offset; /* the offset of IPv4 header from the start of the packet */
};
/**
@@ -61,6 +603,10 @@ class StreamVmInstructionFlowMan : public StreamVmInstruction {
public:
+ virtual instruction_type_t get_instruction_type(){
+ return ( StreamVmInstruction::itFLOW_MAN);
+ }
+
/**
* different types of operations on the object
*/
@@ -85,7 +631,16 @@ public:
}
+ virtual void Dump(FILE *fd);
+
+ void sanity_check(uint32_t ins_id,StreamVm *lp);
+
private:
+ void sanity_check_valid_range(uint32_t ins_id,StreamVm *lp);
+ void sanity_check_valid_size(uint32_t ins_id,StreamVm *lp);
+ void sanity_check_valid_opt(uint32_t ins_id,StreamVm *lp);
+
+public:
/* flow var name */
@@ -105,6 +660,83 @@ private:
};
+
+/**
+ * flow client instruction - save state for client range and port range
+ *
+ * @author hhaim
+ */
+class StreamVmInstructionFlowClient : public StreamVmInstruction {
+
+public:
+ enum client_flags_e {
+ CLIENT_F_UNLIMITED_FLOWS=1, /* unlimited number of flow, don't care about ports */
+ };
+
+
+ virtual instruction_type_t get_instruction_type(){
+ return ( StreamVmInstruction::itFLOW_CLIENT);
+ }
+
+
+ StreamVmInstructionFlowClient(const std::string &var_name,
+ uint32_t client_min_value,
+ uint32_t client_max_value,
+ uint16_t port_min,
+ uint16_t port_max,
+ uint32_t limit_num_flows, /* zero means don't limit */
+ uint16_t flags
+ ) {
+ m_var_name = var_name;
+ m_client_min = client_min_value;
+ m_client_max = client_max_value;
+
+ m_port_min = port_min;
+ m_port_max = port_max;
+
+ m_limit_num_flows = limit_num_flows;
+ m_flags = flags;
+ }
+
+ virtual void Dump(FILE *fd);
+
+ static uint8_t get_flow_var_size(){
+ return (4+2+4);
+ }
+
+ bool is_unlimited_flows(){
+ return ( (m_flags & StreamVmInstructionFlowClient::CLIENT_F_UNLIMITED_FLOWS ) ==
+ StreamVmInstructionFlowClient::CLIENT_F_UNLIMITED_FLOWS );
+ }
+public:
+
+
+ /* flow var name */
+ std::string m_var_name;
+
+ uint32_t m_client_min; // min ip
+ uint32_t m_client_max; // max ip
+ uint16_t m_port_min; // start port
+ uint16_t m_port_max; // start port
+ uint32_t m_limit_num_flows; // number of flows
+ uint16_t m_flags;
+};
+
+
+
+class VmFlowVarRec {
+public:
+ uint32_t m_offset;
+ union {
+ StreamVmInstructionFlowMan * m_ins_flowv;
+ StreamVmInstructionFlowClient * m_ins_flow_client;
+ } m_ins;
+ uint8_t m_size_bytes;
+};
+
+
+
+
/**
* write flow var to packet
*
@@ -121,7 +753,14 @@ public:
m_pkt_offset(pkt_offset),
m_add_value(add_value),
m_is_big_endian(is_big_endian) {}
-private:
+
+ virtual instruction_type_t get_instruction_type(){
+ return ( StreamVmInstruction::itPKT_WR);
+ }
+
+ virtual void Dump(FILE *fd);
+
+public:
/* flow var name to write */
std::string m_flow_var_name;
@@ -136,12 +775,161 @@ private:
bool m_is_big_endian;
};
+
+/**
+ * describes a VM program for DP
+ *
+ */
+
+class StreamVmDp {
+public:
+ StreamVmDp(){
+ m_bss_ptr=NULL;
+ m_program_ptr =NULL ;
+ m_bss_size=0;
+ m_program_size=0;
+ m_max_pkt_offset_change=0;
+ }
+
+ StreamVmDp( uint8_t * bss,
+ uint16_t bss_size,
+ uint8_t * prog,
+ uint16_t prog_size,
+ uint16_t max_pkt_offset
+ ){
+
+ if (bss) {
+ assert(bss_size);
+ m_bss_ptr=(uint8_t*)malloc(bss_size);
+ assert(m_bss_ptr);
+ memcpy(m_bss_ptr,bss,bss_size);
+ m_bss_size=bss_size;
+ }else{
+ m_bss_ptr=NULL;
+ m_bss_size=0;
+ }
+
+ if (prog) {
+ assert(prog_size);
+ m_program_ptr=(uint8_t*)malloc(prog_size);
+ memcpy(m_program_ptr,prog,prog_size);
+ m_program_size = prog_size;
+ }else{
+ m_program_ptr = NULL;
+ m_program_size=0;
+ }
+ m_max_pkt_offset_change =max_pkt_offset;
+ }
+
+ ~StreamVmDp(){
+ if (m_bss_ptr) {
+ free(m_bss_ptr);
+ m_bss_ptr=0;
+ m_bss_size=0;
+ }
+ if (m_program_ptr) {
+ free(m_program_ptr);
+ m_program_size=0;
+ m_program_ptr=0;
+ }
+ }
+
+ StreamVmDp * clone() const {
+ StreamVmDp * lp= new StreamVmDp(m_bss_ptr,
+ m_bss_size,
+ m_program_ptr,
+ m_program_size,
+ m_max_pkt_offset_change
+ );
+ assert(lp);
+ return (lp);
+ }
+
+ uint8_t* clone_bss(){
+ assert(m_bss_size>0);
+ uint8_t *p=(uint8_t *)malloc(m_bss_size);
+ assert(p);
+ memcpy(p,m_bss_ptr,m_bss_size);
+ return (p);
+ }
+
+ uint16_t get_bss_size(){
+ return(m_bss_size);
+ }
+
+ uint8_t* get_bss(){
+ return (m_bss_ptr);
+ }
+
+ uint8_t* get_program(){
+ return (m_program_ptr);
+ }
+
+ uint16_t get_program_size(){
+ return (m_program_size);
+ }
+
+ uint16_t get_max_packet_update_offset(){
+ return (m_max_pkt_offset_change);
+ }
+
+private:
+ uint8_t * m_bss_ptr; /* pointer to the data section */
+ uint8_t * m_program_ptr; /* pointer to the program */
+ uint16_t m_bss_size;
+ uint16_t m_program_size; /* program size*/
+ uint16_t m_max_pkt_offset_change;
+
+};
+
+
/**
* describes a VM program
*
*/
class StreamVm {
+
public:
+ enum STREAM_VM {
+ svMAX_FLOW_VAR = 64, /* maximum flow varible */
+ svMAX_PACKET_OFFSET_CHANGE = 512
+ };
+
+
+
+ StreamVm(){
+ m_bss=0;
+ m_pkt_size=0;
+ m_cur_var_offset=0;
+ }
+
+
+ /* set packet size */
+ void set_packet_size(uint16_t pkt_size){
+ m_pkt_size = pkt_size;
+ }
+
+ uint16_t get_packet_size(){
+ return ( m_pkt_size);
+ }
+
+
+ StreamVmDp * cloneAsVmDp(){
+
+ StreamVmDp * lp= new StreamVmDp(get_bss_ptr(),
+ get_bss_size(),
+ get_dp_instruction_buffer()->get_program(),
+ get_dp_instruction_buffer()->get_program_size(),
+ get_max_packet_update_offset()
+ );
+ assert(lp);
+ return (lp);
+
+ }
+
+ bool is_vm_empty() {
+ return (m_inst_list.size() == 0);
+ }
/**
* add new instruction to the VM
@@ -155,17 +943,75 @@ public:
*/
const std::vector<StreamVmInstruction *> & get_instruction_list();
+ StreamDPVmInstructions * get_dp_instruction_buffer();
+
+ uint16_t get_bss_size(){
+ return (m_cur_var_offset );
+ }
+
+ uint8_t * get_bss_ptr(){
+ return (m_bss );
+ }
+
+
+ uint16_t get_max_packet_update_offset(){
+ return ( m_max_field_update );
+ }
+
+
+
/**
* compile the VM
* return true of success, o.w false
*
*/
- bool compile();
+ void compile();
~StreamVm();
+ void Dump(FILE *fd);
+
+ /* raise exception */
+ void err(const std::string &err);
+
private:
+
+ /* lookup for varible offset, */
+ bool var_lookup(const std::string &var_name,VmFlowVarRec & var);
+
+ void var_clear_table();
+
+ bool var_add(const std::string &var_name,VmFlowVarRec & var);
+
+ uint16_t get_var_offset(const std::string &var_name);
+
+ void build_flow_var_table() ;
+
+ void build_bss();
+
+ void build_program();
+
+ void alloc_bss();
+
+ void free_bss();
+
+private:
+
+ void clean_max_field_cnt();
+
+ void add_field_cnt(uint16_t new_cnt);
+
+private:
+ uint16_t m_pkt_size;
+ uint16_t m_cur_var_offset;
+ uint16_t m_max_field_update; /* the location of the last byte that is going to be changed in the packet */
+
std::vector<StreamVmInstruction *> m_inst_list;
+ std::unordered_map<std::string, VmFlowVarRec> m_flow_var_offset;
+ uint8_t * m_bss;
+
+ StreamDPVmInstructions m_instructions;
+
};
#endif /* __TREX_STREAM_VM_API_H__ */
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
index b28989be..c4900e66 100644
--- a/src/stateless/cp/trex_streams_compiler.cpp
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -142,8 +142,9 @@ private:
/**************************************
* stream compiled object
*************************************/
-TrexStreamsCompiledObj::TrexStreamsCompiledObj(uint8_t port_id, double mul) : m_port_id(port_id), m_mul(mul) {
- m_all_continues=false;
+TrexStreamsCompiledObj::TrexStreamsCompiledObj(uint8_t port_id) {
+ m_port_id = port_id;
+ m_all_continues = false;
}
TrexStreamsCompiledObj::~TrexStreamsCompiledObj() {
@@ -155,53 +156,40 @@ TrexStreamsCompiledObj::~TrexStreamsCompiledObj() {
void
-TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream){
+TrexStreamsCompiledObj::add_compiled_stream(TrexStream *stream){
obj_st obj;
- obj.m_stream = stream->clone_as_dp();
+ obj.m_stream = stream;
m_objs.push_back(obj);
}
-void
-TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream,
- uint32_t my_dp_id, int next_dp_id) {
- obj_st obj;
-
- obj.m_stream = stream->clone_as_dp();
- /* compress the id's*/
- obj.m_stream->fix_dp_stream_id(my_dp_id,next_dp_id);
-
- m_objs.push_back(obj);
-}
-
-void TrexStreamsCompiledObj::Dump(FILE *fd){
- for (auto obj : m_objs) {
- obj.m_stream->Dump(fd);
- }
-}
-
-
TrexStreamsCompiledObj *
TrexStreamsCompiledObj::clone() {
- /* use multiplier of 1 to avoid double mult */
- TrexStreamsCompiledObj *new_compiled_obj = new TrexStreamsCompiledObj(m_port_id, 1);
+ TrexStreamsCompiledObj *new_compiled_obj = new TrexStreamsCompiledObj(m_port_id);
/**
* clone each element
*/
for (auto obj : m_objs) {
- new_compiled_obj->add_compiled_stream(obj.m_stream);
+ TrexStream *new_stream = obj.m_stream->clone_as_dp();
+ new_compiled_obj->add_compiled_stream(new_stream);
}
- new_compiled_obj->m_mul = m_mul;
-
return new_compiled_obj;
+
}
+void TrexStreamsCompiledObj::Dump(FILE *fd){
+ for (auto obj : m_objs) {
+ obj.m_stream->Dump(fd);
+ }
+}
+
+
void
TrexStreamsCompiler::add_warning(const std::string &warning) {
m_warnings.push_back("*** warning: " + warning);
@@ -219,7 +207,7 @@ TrexStreamsCompiler::check_stream(const TrexStream *stream) {
/* cont. stream can point only on itself */
if (stream->get_type() == TrexStream::stCONTINUOUS) {
if (stream->m_next_stream_id != -1) {
- ss << "continous stream '" << stream->m_stream_id << "' cannot point on another stream";
+ ss << "continous stream '" << stream->m_stream_id << "' cannot point to another stream";
err(ss.str());
}
}
@@ -381,12 +369,34 @@ TrexStreamsCompiler::pre_compile_check(const std::vector<TrexStream *> &streams,
* stream compiler
*************************************/
bool
-TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams,
- TrexStreamsCompiledObj &obj,
- std::string *fail_msg) {
+TrexStreamsCompiler::compile(uint8_t port_id,
+ const std::vector<TrexStream *> &streams,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ uint8_t dp_core_count,
+ double factor,
+ std::string *fail_msg) {
+
+ try {
+ return compile_internal(port_id,streams,objs,dp_core_count,factor,fail_msg);
+ } catch (const TrexException &ex) {
+ if (fail_msg) {
+ *fail_msg = ex.what();
+ } else {
+ std::cout << ex.what();
+ }
+ return false;
+ }
+
+}
+bool
+TrexStreamsCompiler::compile_internal(uint8_t port_id,
+ const std::vector<TrexStream *> &streams,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ uint8_t dp_core_count,
+ double factor,
+ std::string *fail_msg) {
#if 0
- fprintf(stdout,"------------pre compile \n");
for (auto stream : streams) {
stream->Dump(stdout);
}
@@ -397,49 +407,100 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams,
/* compile checks */
- try {
- pre_compile_check(streams,nodes);
- } catch (const TrexException &ex) {
- if (fail_msg) {
- *fail_msg = ex.what();
- } else {
- std::cout << ex.what();
+ pre_compile_check(streams, nodes);
+
+ /* check if all are cont. streams */
+ bool all_continues = true;
+ for (const auto stream : streams) {
+ if (stream->get_type() != TrexStream::stCONTINUOUS) {
+ all_continues = false;
+ break;
}
- return false;
}
+ /* allocate objects for all DP cores */
+ for (uint8_t i = 0; i < dp_core_count; i++) {
+ TrexStreamsCompiledObj *obj = new TrexStreamsCompiledObj(port_id);
+ obj->m_all_continues = all_continues;
+ objs.push_back(obj);
+ }
- bool all_continues=true;
- /* for now we do something trivial, */
+ /* compile all the streams */
for (auto stream : streams) {
/* skip non-enabled streams */
if (!stream->m_enabled) {
continue;
}
- if (stream->get_type() != TrexStream::stCONTINUOUS ) {
- all_continues=false;
- }
+
+ /* compile a single stream to all cores */
+ compile_stream(stream, factor, dp_core_count, objs, nodes);
+ }
- int new_id= nodes.get(stream->m_stream_id)->m_compressed_stream_id;
- assert(new_id>=0);
- uint32_t my_stream_id = (uint32_t)new_id;
- int my_next_stream_id=-1;
- if (stream->m_next_stream_id>=0) {
- my_next_stream_id=nodes.get(stream->m_next_stream_id)->m_compressed_stream_id;
- }
+ return true;
+}
- /* add it */
- obj.add_compiled_stream(stream,
- my_stream_id,
- my_next_stream_id
- );
-
+/**
+ * compiles a single stream to DP objects
+ *
+ * @author imarom (03-Dec-15)
+ *
+ */
+void
+TrexStreamsCompiler::compile_stream(const TrexStream *stream,
+ double factor,
+ uint8_t dp_core_count,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ GraphNodeMap &nodes) {
+
+
+ /* fix the stream ids */
+ int new_id = nodes.get(stream->m_stream_id)->m_compressed_stream_id;
+ assert(new_id >= 0);
+
+ int new_next_id = -1;
+ if (stream->m_next_stream_id >= 0) {
+ new_next_id = nodes.get(stream->m_next_stream_id)->m_compressed_stream_id;
}
- obj.m_all_continues =all_continues;
- return true;
+
+ /* calculate rate */
+ double per_core_rate = (stream->m_pps * (factor / dp_core_count));
+ int per_core_burst_total_pkts = (stream->m_burst_total_pkts / dp_core_count);
+
+ /* compile VM */
+ /* fix this const away problem */
+ ((TrexStream *)stream)->compile();
+
+ std::vector<TrexStream *> per_core_streams(dp_core_count);
+
+ /* for each core - creates its own version of the stream */
+ for (uint8_t i = 0; i < dp_core_count; i++) {
+ TrexStream *dp_stream = stream->clone_as_dp();
+
+ /* fix stream ID */
+ dp_stream->fix_dp_stream_id(new_id, new_next_id);
+
+
+ /* adjust rate and packets count */
+ dp_stream->m_pps = per_core_rate;
+ dp_stream->m_burst_total_pkts = per_core_burst_total_pkts;
+
+ per_core_streams[i] = dp_stream;
+ }
+
+ /* take care of remainder from a burst */
+ int burst_remainder = stream->m_burst_total_pkts - (per_core_burst_total_pkts * dp_core_count);
+ per_core_streams[0]->m_burst_total_pkts += burst_remainder;
+
+ /* attach the compiled stream of every core to its object */
+ for (uint8_t i = 0; i < dp_core_count; i++) {
+ objs[i]->add_compiled_stream(per_core_streams[i]);
+ }
+
+
}
+
/**************************************
* streams graph
*************************************/
@@ -486,10 +547,13 @@ TrexStreamsGraph::add_rate_events_for_stream_cont(double &offset_usec, const Tre
start_event.time = offset_usec + stream->m_isg_usec;
start_event.diff_pps = stream->get_pps();
start_event.diff_bps = stream->get_bps();
- m_graph_obj.add_rate_event(start_event);
+ m_graph_obj->add_rate_event(start_event);
/* no more events after this stream */
offset_usec = -1;
+
+ /* also mark we have an inifite time */
+ m_graph_obj->m_expected_duration = -1;
}
/**
@@ -510,13 +574,13 @@ TrexStreamsGraph::add_rate_events_for_stream_single_burst(double &offset_usec, c
start_event.time = offset_usec + stream->m_isg_usec;
start_event.diff_pps = stream->get_pps();
start_event.diff_bps = stream->get_bps();
- m_graph_obj.add_rate_event(start_event);
+ m_graph_obj->add_rate_event(start_event);
/* stop event */
stop_event.time = start_event.time + stream->get_burst_length_usec();
stop_event.diff_pps = -(start_event.diff_pps);
stop_event.diff_bps = -(start_event.diff_bps);
- m_graph_obj.add_rate_event(stop_event);
+ m_graph_obj->add_rate_event(stop_event);
/* next stream starts from here */
offset_usec = stop_event.time;
@@ -549,10 +613,10 @@ TrexStreamsGraph::add_rate_events_for_stream_multi_burst(double &offset_usec, co
for (int i = 0; i < stream->m_num_bursts; i++) {
start_event.time = offset_usec + delay;
- m_graph_obj.add_rate_event(start_event);
+ m_graph_obj->add_rate_event(start_event);
stop_event.time = start_event.time + stream->get_burst_length_usec();
- m_graph_obj.add_rate_event(stop_event);
+ m_graph_obj->add_rate_event(stop_event);
/* after the first burst, the delay is inter burst gap */
delay = stream->m_ibg_usec;
@@ -602,6 +666,7 @@ TrexStreamsGraph::generate_graph_for_one_root(uint32_t root_stream_id) {
/* loop detection */
auto search = loop_hash.find(stream->m_next_stream_id);
if (search != loop_hash.end()) {
+ m_graph_obj->on_loop_detection();
break;
}
@@ -615,8 +680,12 @@ TrexStreamsGraph::generate_graph_for_one_root(uint32_t root_stream_id) {
* see graph object for more details
*
*/
-const TrexStreamsGraphObj &
+const TrexStreamsGraphObj *
TrexStreamsGraph::generate(const std::vector<TrexStream *> &streams) {
+
+ /* main object to hold the graph - returned to the user */
+ m_graph_obj = new TrexStreamsGraphObj();
+
std::vector <uint32_t> root_streams;
/* before anything we create a hash streams ID
@@ -644,7 +713,7 @@ TrexStreamsGraph::generate(const std::vector<TrexStream *> &streams) {
}
- m_graph_obj.generate();
+ m_graph_obj->generate();
return m_graph_obj;
}
@@ -670,6 +739,11 @@ TrexStreamsGraphObj::find_max_rate() {
max_rate_bps = std::max(max_rate_bps, current_rate_bps);
}
+ /* if not mark as inifite - get the last event time */
+ if (m_expected_duration != -1) {
+ m_expected_duration = m_rate_events.back().time;
+ }
+
m_max_pps = max_rate_pps;
m_max_bps = max_rate_bps;
}
diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h
index 70a31c5e..d2b0cd1d 100644
--- a/src/stateless/cp/trex_streams_compiler.h
+++ b/src/stateless/cp/trex_streams_compiler.h
@@ -38,9 +38,10 @@ class GraphNodeMap;
*/
class TrexStreamsCompiledObj {
friend class TrexStreamsCompiler;
+
public:
- TrexStreamsCompiledObj(uint8_t port_id, double m_mul);
+ TrexStreamsCompiledObj(uint8_t port_id);
~TrexStreamsCompiledObj();
struct obj_st {
@@ -56,32 +57,22 @@ public:
return (m_port_id);
}
- /**
- * clone the compiled object
- *
- */
- TrexStreamsCompiledObj * clone();
-
- double get_multiplier(){
- return (m_mul);
- }
-
bool get_all_streams_continues(){
return (m_all_continues);
}
void Dump(FILE *fd);
+ TrexStreamsCompiledObj* clone();
+
private:
- void add_compiled_stream(TrexStream * stream,
- uint32_t my_dp_id, int next_dp_id);
- void add_compiled_stream(TrexStream * stream);
+ void add_compiled_stream(TrexStream *stream);
+
std::vector<obj_st> m_objs;
bool m_all_continues;
uint8_t m_port_id;
- double m_mul;
};
class TrexStreamsCompiler {
@@ -93,7 +84,13 @@ public:
* @author imarom (28-Oct-15)
*
*/
- bool compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj, std::string *fail_msg = NULL);
+ bool compile(uint8_t port_id,
+ const std::vector<TrexStream *> &streams,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ uint8_t dp_core_count = 1,
+ double factor = 1.0,
+ std::string *fail_msg = NULL);
+
/**
*
@@ -106,6 +103,13 @@ public:
private:
+ bool compile_internal(uint8_t port_id,
+ const std::vector<TrexStream *> &streams,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ uint8_t dp_core_count,
+ double factor,
+ std::string *fail_msg);
+
void pre_compile_check(const std::vector<TrexStream *> &streams,
GraphNodeMap & nodes);
void allocate_pass(const std::vector<TrexStream *> &streams, GraphNodeMap *nodes);
@@ -115,8 +119,15 @@ private:
void add_warning(const std::string &warning);
void err(const std::string &err);
+ void compile_stream(const TrexStream *stream,
+ double factor,
+ uint8_t dp_core_count,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ GraphNodeMap &nodes);
+
+ void compile_stream_vm(TrexStream *stream);
+
std::vector<std::string> m_warnings;
-
};
class TrexStreamsGraph;
@@ -131,6 +142,12 @@ class TrexStreamsGraphObj {
public:
+ TrexStreamsGraphObj() {
+ m_max_pps = 0;
+ m_max_bps = 0;
+ m_expected_duration = 0;
+ }
+
/**
* rate event is defined by those:
* time - the time of the event on the timeline
@@ -153,12 +170,21 @@ public:
return m_max_bps;
}
+ int get_duration() const {
+ return m_expected_duration;
+ }
+
const std::list<rate_event_st> & get_events() const {
return m_rate_events;
}
+
private:
+ void on_loop_detection() {
+ m_expected_duration = -1;
+ }
+
void add_rate_event(const rate_event_st &ev) {
m_rate_events.push_back(ev);
}
@@ -166,11 +192,13 @@ private:
void generate();
void find_max_rate();
- double m_max_pps;
- double m_max_bps;
+ double m_max_pps;
+ double m_max_bps;
+ int m_expected_duration;
/* list of rate events */
std::list<rate_event_st> m_rate_events;
+
};
/**
@@ -181,11 +209,15 @@ private:
class TrexStreamsGraph {
public:
+ TrexStreamsGraph() {
+ m_graph_obj = NULL;
+ }
+
/**
* generate a sequence graph for streams
*
*/
- const TrexStreamsGraphObj & generate(const std::vector<TrexStream *> &streams);
+ const TrexStreamsGraphObj * generate(const std::vector<TrexStream *> &streams);
private:
@@ -200,7 +232,7 @@ private:
std::unordered_map<uint32_t, const TrexStream *> m_streams_hash;
/* main object to hold the graph - returned to the user */
- TrexStreamsGraphObj m_graph_obj;
+ TrexStreamsGraphObj *m_graph_obj;
};
#endif /* __TREX_STREAMS_COMPILER_H__ */
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index dd4937cd..585ff2c7 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -106,12 +106,56 @@ std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state)
}
+rte_mbuf_t * CGenNodeStateless::alloc_node_with_vm(){
+
+ rte_mbuf_t * m;
+ /* alloc small packet buffer*/
+ uint16_t prefix_size = prefix_header_size();
+ m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), prefix_size );
+ if (m==0) {
+ return (m);
+ }
+ /* TBD remove this, should handle cases of error */
+ assert(m);
+ char *p=rte_pktmbuf_append(m, prefix_size);
+ memcpy( p ,m_original_packet_data_prefix, prefix_size);
+
+
+ /* run the VM program */
+ StreamDPVmInstructionsRunner runner;
+
+ runner.run( m_vm_program_size,
+ m_vm_program,
+ m_vm_flow_var,
+ (uint8_t*)p);
+
+
+ rte_mbuf_t * m_const = get_const_mbuf();
+ if ( m_const != NULL) {
+ utl_rte_pktmbuf_add_after(m,m_const);
+ }
+ return (m);
+}
+
+
void CGenNodeStateless::free_stl_node(){
/* if we have cache mbuf free it */
rte_mbuf_t * m=get_cache_mbuf();
if (m) {
rte_pktmbuf_free(m);
m_cache_mbuf=0;
+ }else{
+ /* non cache - must have an header */
+ m=get_const_mbuf();
+ if (m) {
+ rte_pktmbuf_free(m); /* reduce the ref counter */
+ }
+ free_prefix_header();
+ }
+ if (m_vm_flow_var) {
+ /* free flow var */
+ free(m_vm_flow_var);
+ m_vm_flow_var=0;
}
}
@@ -139,7 +183,7 @@ bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
return (true);
}
-bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double mul) {
+bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) {
assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING ||
(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) );
@@ -148,7 +192,7 @@ bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double mul) {
CGenNodeStateless * node = dp_stream.m_node;
assert(node->get_port_id() == port_id);
- node->set_multiplier(mul);
+ node->update_rate(factor);
}
return (true);
@@ -423,6 +467,7 @@ TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
CGenNodeStateless *node = m_core->create_node_sl();
/* add periodic */
+ node->m_cache_mbuf=0;
node->m_type = CGenNode::STATELESS_PKT;
node->m_ref_stream_info = stream->clone_as_dp();
@@ -442,6 +487,10 @@ TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
node->m_flags = 0;
+ node->m_src_port =0;
+ node->m_original_packet_data_prefix = 0;
+
+
/* set socket id */
node->set_socket_id(m_core->m_node_gen.m_socket_id);
@@ -453,8 +502,7 @@ TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
node->m_pause =0;
node->m_stream_type = stream->m_type;
- node->m_base_pps = stream->get_pps();
- node->set_multiplier(comp->get_multiplier());
+ node->m_next_time_offset = 1.0 / stream->get_pps();
/* stateless specific fields */
switch ( stream->m_type ) {
@@ -487,23 +535,77 @@ TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
node->m_port_id = stream->m_port_id;
- /* allocate const mbuf */
- rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
- assert(m);
-
- char *p = rte_pktmbuf_append(m, pkt_size);
- assert(p);
- /* copy the packet */
- memcpy(p,stream_pkt,pkt_size);
-
/* set dir 0 or 1 client or server */
node->set_mbuf_cache_dir(dir);
- /* TBD repace the mac if req we should add flag */
- m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m);
- /* set the packet as a readonly */
- node->set_cache_mbuf(m);
+ if (stream->is_vm() == false ) {
+ /* no VM */
+
+ node->m_vm_flow_var = NULL;
+ node->m_vm_program = NULL;
+ node->m_vm_program_size =0;
+
+ /* allocate const mbuf */
+ rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
+ assert(m);
+
+ char *p = rte_pktmbuf_append(m, pkt_size);
+ assert(p);
+ /* copy the packet */
+ memcpy(p,stream_pkt,pkt_size);
+
+ /* TBD repace the mac if req we should add flag */
+ m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,(uint8_t*) p);
+
+ /* set the packet as a readonly */
+ node->set_cache_mbuf(m);
+
+ node->m_original_packet_data_prefix =0;
+ }else{
+
+ /* set the program */
+ TrexStream * local_mem_stream = node->m_ref_stream_info;
+
+ StreamVmDp * lpDpVm = local_mem_stream->getDpVm();
+
+ node->m_vm_flow_var = lpDpVm->clone_bss(); /* clone the flow var */
+ node->m_vm_program = lpDpVm->get_program(); /* same ref to the program */
+ node->m_vm_program_size =lpDpVm->get_program_size();
+
+
+ /* we need to copy the object */
+ if ( pkt_size > stream->m_vm_prefix_size ) {
+ /* we need const packet */
+ uint16_t const_pkt_size = pkt_size - stream->m_vm_prefix_size ;
+ rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), const_pkt_size );
+ assert(m);
+
+ char *p = rte_pktmbuf_append(m, const_pkt_size);
+ assert(p);
+
+ /* copy packet data */
+ memcpy(p,(stream_pkt+ stream->m_vm_prefix_size),const_pkt_size);
+
+ node->set_const_mbuf(m);
+ }
+
+
+ if (stream->m_vm_prefix_size > pkt_size ) {
+ stream->m_vm_prefix_size = pkt_size;
+ }
+ /* copy the headr */
+ uint16_t header_size = stream->m_vm_prefix_size;
+ assert(header_size);
+ node->alloc_prefix_header(header_size);
+ uint8_t *p=node->m_original_packet_data_prefix;
+ assert(p);
+
+ memcpy(p,stream_pkt , header_size);
+ /* TBD repace the mac if req we should add flag */
+ m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, p);
+ }
+
CDpOneStream one_stream;
@@ -597,11 +699,11 @@ TrexStatelessDpCore::pause_traffic(uint8_t port_id){
}
void
-TrexStatelessDpCore::update_traffic(uint8_t port_id, double mul) {
+TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
- lp_port->update_traffic(port_id, mul);
+ lp_port->update_traffic(port_id, factor);
}
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 563159b2..7dc4a2b2 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -68,7 +68,7 @@ public:
bool resume_traffic(uint8_t port_id);
- bool update_traffic(uint8_t port_id, double mul);
+ bool update_traffic(uint8_t port_id, double factor);
bool stop_traffic(uint8_t port_id,
bool stop_on_id,
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index 5997376f..d33785fe 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -54,6 +54,16 @@ struct CGenNodeStateless : public CGenNodeBase {
friend class TrexStatelessDpCore;
public:
+
+ /* flags MASKS*/
+ enum {
+ SL_NODE_FLAGS_DIR =1, //USED by master
+ SL_NODE_FLAGS_MBUF_CACHE =2, //USED by master
+
+ SL_NODE_CONST_MBUF =4
+
+ };
+
enum {
ss_FREE_RESUSE =1, /* should be free by scheduler */
ss_INACTIVE =2, /* will be active by other stream or stopped */
@@ -83,14 +93,20 @@ private:
uint32_t m_multi_bursts; /* in case of multi_burst how many bursts */
/* cache line 1 */
- TrexStream * m_ref_stream_info; /* the stream info */
+ TrexStream * m_ref_stream_info; /* the stream info */
CGenNodeStateless * m_next_stream;
- double m_base_pps;
- /* pad to match the size of CGenNode */
- uint8_t m_pad_end[48];
+ uint8_t * m_original_packet_data_prefix; /* pointer to the original first pointer 64/128/512 */
+
+ /* Fast Field VM section */
+ uint8_t * m_vm_flow_var; /* pointer to the vm flow var */
+ uint8_t * m_vm_program; /* pointer to the program */
+ uint16_t m_vm_program_size; /* up to 64K op codes */
+ /* End Fast Field VM Section */
+ /* pad to match the size of CGenNode */
+ uint8_t m_pad_end[30];
public:
@@ -105,8 +121,9 @@ public:
* on the PPS and multiplier
*
*/
- void set_multiplier(double mul) {
- m_next_time_offset = 1.0 / (m_base_pps * mul) ;
+ void update_rate(double factor) {
+ /* update the inter packet gap */
+ m_next_time_offset = m_next_time_offset / factor;
}
/* we restart the stream, schedule it using stream isg */
@@ -256,13 +273,51 @@ public:
}
inline rte_mbuf_t * get_cache_mbuf(){
- if ( m_flags &NODE_FLAGS_MBUF_CACHE ) {
+ if ( m_flags & NODE_FLAGS_MBUF_CACHE ) {
+ return ((rte_mbuf_t *)m_cache_mbuf);
+ }else{
+ return ((rte_mbuf_t *)0);
+ }
+ }
+
+ inline void set_const_mbuf(rte_mbuf_t * m){
+ m_cache_mbuf=(void *)m;
+ m_flags |= SL_NODE_CONST_MBUF;
+ }
+
+ inline rte_mbuf_t * get_const_mbuf(){
+ if ( m_flags &SL_NODE_CONST_MBUF ) {
return ((rte_mbuf_t *)m_cache_mbuf);
}else{
return ((rte_mbuf_t *)0);
}
}
+ /* prefix header exits only in non cache mode size is 64/128/512 other are not possible right now */
+ inline void alloc_prefix_header(uint16_t size){
+ set_prefix_header_size(size);
+ m_original_packet_data_prefix = (uint8_t *)malloc(size);
+ assert(m_original_packet_data_prefix);
+ }
+
+ inline void free_prefix_header(){
+ if (m_original_packet_data_prefix) {
+ free(m_original_packet_data_prefix);
+ }
+ }
+
+ /* prefix headr could be 64/128/512 */
+ inline void set_prefix_header_size(uint16_t size){
+ m_src_port=size;
+ }
+
+ inline uint16_t prefix_header_size(){
+ return (m_src_port);
+ }
+
+
+ rte_mbuf_t * alloc_node_with_vm();
+
void free_stl_node();
public:
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 3210f29a..257de168 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -168,14 +168,14 @@ TrexStatelessDpCanQuit::clone(){
************************/
bool
TrexStatelessDpUpdate::handle(TrexStatelessDpCore *dp_core) {
- dp_core->update_traffic(m_port_id, m_mul);
+ dp_core->update_traffic(m_port_id, m_factor);
return true;
}
TrexStatelessCpToDpMsgBase *
TrexStatelessDpUpdate::clone() {
- TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpUpdate(m_port_id, m_mul);
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpUpdate(m_port_id, m_factor);
return new_msg;
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 7390be60..d56596bf 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -231,9 +231,9 @@ public:
class TrexStatelessDpUpdate : public TrexStatelessCpToDpMsgBase {
public:
- TrexStatelessDpUpdate(uint8_t port_id, double mul) {
+ TrexStatelessDpUpdate(uint8_t port_id, double factor) {
m_port_id = port_id;
- m_mul = mul;
+ m_factor = factor;
}
virtual bool handle(TrexStatelessDpCore *dp_core);
@@ -242,7 +242,7 @@ public:
private:
uint8_t m_port_id;
- double m_mul;
+ double m_factor;
};