summaryrefslogtreecommitdiffstats
path: root/src/stateless
diff options
context:
space:
mode:
authorDan Klein <danklein10@gmail.com>2015-11-24 08:54:53 +0200
committerDan Klein <danklein10@gmail.com>2015-11-24 08:54:53 +0200
commite7cb8b0f6c2fbe08d2086a7408040ac7d12aee5a (patch)
tree1b27e542fe9f3ae4abdc8245b804cda25a6e2c2f /src/stateless
parent597f74d8ed10abc3dd9df7e81ecea5ac2f5c714e (diff)
parentf3861d504353729724086dec82c79e818224554f (diff)
Merge branch 'master' into dan_stateless
Diffstat (limited to 'src/stateless')
-rw-r--r--src/stateless/cp/trex_dp_port_events.cpp220
-rw-r--r--src/stateless/cp/trex_dp_port_events.h171
-rw-r--r--src/stateless/cp/trex_stateless.cpp4
-rw-r--r--src/stateless/cp/trex_stateless.h9
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp239
-rw-r--r--src/stateless/cp/trex_stateless_port.h167
-rw-r--r--src/stateless/cp/trex_stream.cpp74
-rw-r--r--src/stateless/cp/trex_stream.h171
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp394
-rw-r--r--src/stateless/cp/trex_streams_compiler.h59
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp526
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h158
-rw-r--r--src/stateless/dp/trex_stream_node.h197
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp128
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h219
15 files changed, 2505 insertions, 231 deletions
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp
new file mode 100644
index 00000000..ba327e59
--- /dev/null
+++ b/src/stateless/cp/trex_dp_port_events.cpp
@@ -0,0 +1,220 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include <trex_dp_port_events.h>
+#include <sstream>
+#include <os_time.h>
+#include <trex_stateless.h>
+
+/**
+ * port events
+ */
+void
+TrexDpPortEvents::create(TrexStatelessPort *port) {
+ m_port = port;
+
+ for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) {
+ m_events[i].create((TrexDpPortEvent::event_e) i, port);
+ }
+
+ m_event_id_counter = EVENT_ID_INVALID;
+}
+
+/**
+ * generate a new event ID
+ *
+ */
+int
+TrexDpPortEvents::generate_event_id() {
+ return (++m_event_id_counter);
+}
+
+/**
+ * mark the next allowed event
+ * all other events will be disabled
+ *
+ */
+void
+TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) {
+
+ /* first disable all events */
+ for (TrexDpPortEvent & e : m_events) {
+ e.disable();
+ }
+
+ /* mark this event as allowed */
+ m_events[ev].wait_for_event(event_id, timeout_ms);
+}
+
+void
+TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) {
+ m_events[ev].disable();
+}
+
+/**
+ * handle an event
+ *
+ */
+void
+TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) {
+ m_events[ev].handle_event(thread_id, event_id);
+}
+
+/***********
+ * single event object
+ *
+ */
+
+void
+TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) {
+ m_event_type = type;
+ m_port = port;
+
+ /* add the core ids to the hash */
+ m_signal.clear();
+ for (int core_id : m_port->get_core_id_list()) {
+ m_signal[core_id] = false;
+ }
+
+ /* event is disabled */
+ disable();
+}
+
+
+/**
+ * wait the event using event id and timeout
+ *
+ */
+void
+TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) {
+
+ /* set a new event id */
+ m_event_id = event_id;
+
+ /* do we have a timeout ? */
+ if (timeout_ms > 0) {
+ m_expire_limit_ms = os_get_time_msec() + timeout_ms;
+ } else {
+ m_expire_limit_ms = -1;
+ }
+
+ /* prepare the signal array */
+ m_pending_cnt = 0;
+ for (auto & core_pair : m_signal) {
+ core_pair.second = false;
+ m_pending_cnt++;
+ }
+}
+
+void
+TrexDpPortEvent::disable() {
+ m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
+}
+
+/**
+ * get the event status
+ *
+ */
+
+TrexDpPortEvent::event_status_e
+TrexDpPortEvent::status() {
+
+ /* is it even active ? */
+ if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) {
+ return (EVENT_DISABLE);
+ }
+
+ /* did it occured ? */
+ if (m_pending_cnt == 0) {
+ return (EVENT_OCCURED);
+ }
+
+ /* so we are enabled and the event did not occur - maybe we timed out ? */
+ if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) {
+ return (EVENT_TIMED_OUT);
+ }
+
+ /* so we are still waiting... */
+ return (EVENT_PENDING);
+
+}
+
+void
+TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg) {
+ std::stringstream err;
+ err << "DP event '" << event_name(m_event_type) << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - ";
+}
+
+/**
+ * event occured
+ *
+ */
+void
+TrexDpPortEvent::handle_event(int thread_id, int event_id) {
+
+ /* if the event is disabled - we don't care */
+ if (!is_active()) {
+ return;
+ }
+
+ /* check the event id is matching the required event - if not maybe its an old signal */
+ if (event_id != m_event_id) {
+ return;
+ }
+
+ /* mark sure no double signal */
+ if (m_signal.at(thread_id)) {
+ err(thread_id, event_id, "double signal");
+
+ } else {
+ /* mark */
+ m_signal.at(thread_id) = true;
+ m_pending_cnt--;
+ }
+
+ /* event occured */
+ if (m_pending_cnt == 0) {
+ m_port->on_dp_event_occured(m_event_type);
+ m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
+ }
+}
+
+bool
+TrexDpPortEvent::is_active() {
+ return (status() != EVENT_DISABLE);
+}
+
+bool
+TrexDpPortEvent::has_timeout_expired() {
+ return (status() == EVENT_TIMED_OUT);
+}
+
+const char *
+TrexDpPortEvent::event_name(event_e type) {
+ switch (type) {
+ case EVENT_STOP:
+ return "DP STOP";
+
+ default:
+ throw TrexException("unknown event type");
+ }
+
+}
diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h
new file mode 100644
index 00000000..557e590b
--- /dev/null
+++ b/src/stateless/cp/trex_dp_port_events.h
@@ -0,0 +1,171 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_DP_PORT_EVENTS_H__
+#define __TREX_DP_PORT_EVENTS_H__
+
+#include <unordered_map>
+#include <string>
+
+class TrexStatelessPort;
+
+/**
+ * describes a single DP event related to port
+ *
+ * @author imarom (18-Nov-15)
+ */
+class TrexDpPortEvent {
+public:
+
+ enum event_e {
+ EVENT_STOP = 1,
+ EVENT_MAX
+ };
+
+ /**
+ * status of the event for the port
+ */
+ enum event_status_e {
+ EVENT_DISABLE,
+ EVENT_PENDING,
+ EVENT_TIMED_OUT,
+ EVENT_OCCURED
+ };
+
+ /**
+ * init for the event
+ *
+ */
+ void create(event_e type, TrexStatelessPort *port);
+
+ /**
+ * create a new pending event
+ *
+ */
+ void wait_for_event(int event_id, int timeout_ms = -1);
+
+ /**
+ * mark event as not allowed to happen
+ *
+ */
+ void disable();
+
+ /**
+ * get the event status
+ *
+ */
+ event_status_e status();
+
+ /**
+ * event occured
+ *
+ */
+ void handle_event(int thread_id, int event_id);
+
+ /**
+ * returns true if event is active
+ *
+ */
+ bool is_active();
+
+ /**
+ * has timeout already expired ?
+ *
+ */
+ bool has_timeout_expired();
+
+ /**
+ * generate error
+ *
+ */
+ void err(int thread_id, int event_id, const std::string &err_msg);
+
+ /**
+ * event to name
+ *
+ */
+ static const char * event_name(event_e type);
+
+
+private:
+
+ event_e m_event_type;
+ std::unordered_map<int, bool> m_signal;
+ int m_pending_cnt;
+
+ TrexStatelessPort *m_port;
+ int m_event_id;
+ int m_expire_limit_ms;
+
+};
+
+/**
+ * all the events related to a port
+ *
+ */
+class TrexDpPortEvents {
+public:
+ friend class TrexDpPortEvent;
+
+ void create(TrexStatelessPort *port);
+
+ /**
+ * generate a new event ID to be used with wait_for_event
+ *
+ */
+ int generate_event_id();
+
+ /**
+ * wait a new DP event on the port
+ * returns a key which will be used to identify
+ * the event happened
+ *
+ * @author imarom (18-Nov-15)
+ *
+ * @param ev - type of event
+ * @param event_id - a unique identifier for the event
+ * @param timeout_ms - does it has a timeout ?
+ *
+ */
+ void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1);
+
+ /**
+ * disable an event (don't care)
+ *
+ */
+ void disable(TrexDpPortEvent::event_e ev);
+
+ /**
+ * event has occured
+ *
+ */
+ void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id);
+
+private:
+ static const int EVENT_ID_INVALID = -1;
+
+ TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX];
+ int m_event_id_counter;
+
+ TrexStatelessPort *m_port;
+
+};
+
+#endif /* __TREX_DP_PORT_EVENTS_H__ */
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index e0e95450..a4522837 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -47,10 +47,12 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) {
m_port_count = cfg.m_port_count;
for (int i = 0; i < m_port_count; i++) {
- m_ports.push_back(new TrexStatelessPort(i));
+ m_ports.push_back(new TrexStatelessPort(i, cfg.m_platform_api));
}
m_platform_api = cfg.m_platform_api;
+ m_publisher = cfg.m_publisher;
+
}
/**
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index 57c6ef1d..5c11be1e 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -30,6 +30,7 @@ limitations under the License.
#include <trex_stream.h>
#include <trex_stateless_port.h>
#include <trex_rpc_server_api.h>
+#include <publisher/trex_publisher.h>
#include <internal_api/trex_platform_api.h>
@@ -93,6 +94,7 @@ public:
m_rpc_async_cfg = NULL;
m_rpc_server_verbose = false;
m_platform_api = NULL;
+ m_publisher = NULL;
}
const TrexRpcServerConfig *m_rpc_req_resp_cfg;
@@ -100,6 +102,7 @@ public:
const TrexPlatformApi *m_platform_api;
bool m_rpc_server_verbose;
uint8_t m_port_count;
+ TrexPublisher *m_publisher;
};
/**
@@ -150,6 +153,10 @@ public:
return (m_platform_api);
}
+ TrexPublisher * get_publisher() {
+ return m_publisher;
+ }
+
const std::vector <TrexStatelessPort *> get_port_list() {
return m_ports;
}
@@ -170,6 +177,8 @@ protected:
/* platform API */
const TrexPlatformApi *m_platform_api;
+ TrexPublisher *m_publisher;
+
std::mutex m_global_cp_lock;
};
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index a0b57b63..40392e68 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -27,45 +27,89 @@ limitations under the License.
#include <string>
#ifndef TREX_RPC_MOCK_SERVER
+
// DPDK c++ issue
-#define UINT8_MAX 255
-#define UINT16_MAX 0xFFFF
+#ifndef UINT8_MAX
+ #define UINT8_MAX 255
+#endif
+
+#ifndef UINT16_MAX
+ #define UINT16_MAX 0xFFFF
+#endif
+
// DPDK c++ issue
#endif
#include <rte_ethdev.h>
#include <os_time.h>
+void
+port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list);
+
using namespace std;
/***************************
* trex stateless port
*
**************************/
-TrexStatelessPort::TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) {
- m_port_state = PORT_STATE_UP_IDLE;
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) {
+ std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
+
+ m_port_id = port_id;
+
+ m_port_state = PORT_STATE_IDLE;
clear_owner();
+
+ /* get the DP cores belonging to this port */
+ api->port_id_to_cores(m_port_id, core_pair_list);
+
+ for (auto core_pair : core_pair_list) {
+
+ /* send the core id */
+ m_cores_id_list.push_back(core_pair.first);
+ }
+
+ /* init the events DP DB */
+ m_dp_events.create(this);
}
/**
- * starts the traffic on the port
+ * acquire the port
+ *
+ * @author imarom (09-Nov-15)
*
+ * @param user
+ * @param force
*/
-TrexStatelessPort::rc_e
-TrexStatelessPort::start_traffic(double mul) {
-
- if (m_port_state != PORT_STATE_UP_IDLE) {
- return (RC_ERR_BAD_STATE_FOR_OP);
+void
+TrexStatelessPort::acquire(const std::string &user, bool force) {
+ if ( (!is_free_to_aquire()) && (get_owner() != user) && (!force)) {
+ throw TrexRpcException("port is already taken by '" + get_owner() + "'");
}
- if (get_stream_table()->size() == 0) {
- return (RC_ERR_NO_STREAMS);
- }
+ set_owner(user);
+}
+
+void
+TrexStatelessPort::release(void) {
+ verify_state( ~(PORT_STATE_TX | PORT_STATE_PAUSE) );
+ clear_owner();
+}
+
+/**
+ * starts the traffic on the port
+ *
+ */
+void
+TrexStatelessPort::start_traffic(double mul, double duration) {
+
+ /* command allowed only on state stream */
+ verify_state(PORT_STATE_STREAMS);
/* fetch all the streams from the table */
vector<TrexStream *> streams;
- get_stream_table()->get_object_list(streams);
+ get_object_list(streams);
/* compiler it */
TrexStreamsCompiler compiler;
@@ -73,68 +117,121 @@ TrexStatelessPort::start_traffic(double mul) {
bool rc = compiler.compile(streams, *compiled_obj);
if (!rc) {
- return (RC_ERR_FAILED_TO_COMPILE_STREAMS);
+ throw TrexRpcException("Failed to compile streams");
}
/* generate a message to all the relevant DP cores to start transmitting */
- TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(compiled_obj);
+ int event_id = m_dp_events.generate_event_id();
+ /* mark that DP event of stoppped is possible */
+ m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id);
- // FIXME (add the right core list)
- CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(0);
+ TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_obj, duration);
- ring->Enqueue((CGenNode *)start_msg);
+ m_last_all_streams_continues = compiled_obj->get_all_streams_continues();
+ m_last_duration =duration;
- /* move the state to transmiting */
- m_port_state = PORT_STATE_TRANSMITTING;
+ change_state(PORT_STATE_TX);
- return (RC_OK);
+ send_message_to_dp(start_msg);
+
}
-TrexStatelessPort::rc_e
+/**
+ * stop traffic on port
+ *
+ * @author imarom (09-Nov-15)
+ *
+ * @return TrexStatelessPort::rc_e
+ */
+void
TrexStatelessPort::stop_traffic(void) {
- /* real code goes here */
- if (m_port_state != PORT_STATE_TRANSMITTING) {
- return (RC_ERR_BAD_STATE_FOR_OP);
+ if (!( (m_port_state == PORT_STATE_TX)
+ || (m_port_state ==PORT_STATE_PAUSE) )) {
+ return;
}
+ /* mask out the DP stop event */
+ m_dp_events.disable(TrexDpPortEvent::EVENT_STOP);
+
/* generate a message to all the relevant DP cores to start transmitting */
TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
- // FIXME (add the right core list)
- CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(0);
+ send_message_to_dp(stop_msg);
- ring->Enqueue((CGenNode *)stop_msg);
+ change_state(PORT_STATE_STREAMS);
+
+}
- m_port_state = PORT_STATE_UP_IDLE;
+void
+TrexStatelessPort::pause_traffic(void) {
+
+ verify_state(PORT_STATE_TX);
- return (RC_OK);
+ if (m_last_all_streams_continues == false) {
+ throw TrexRpcException(" pause is supported when all streams are in continues mode ");
+ }
+
+ if ( m_last_duration>0.0 ) {
+ throw TrexRpcException(" pause is supported when duration is not enable is start command ");
+ }
+
+ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpPause(m_port_id);
+
+ send_message_to_dp(stop_msg);
+
+ change_state(PORT_STATE_PAUSE);
}
-/**
-* access the stream table
-*
-*/
-TrexStreamTable * TrexStatelessPort::get_stream_table() {
- return &m_stream_table;
+void
+TrexStatelessPort::resume_traffic(void) {
+
+ verify_state(PORT_STATE_PAUSE);
+
+ /* generate a message to all the relevant DP cores to start transmitting */
+ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpResume(m_port_id);
+
+ send_message_to_dp(stop_msg);
+
+ change_state(PORT_STATE_TX);
}
+void
+TrexStatelessPort::update_traffic(double mul) {
+
+ verify_state(PORT_STATE_STREAMS | PORT_STATE_TX | PORT_STATE_PAUSE);
+
+ #if 0
+ /* generate a message to all the relevant DP cores to start transmitting */
+ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
+
+ send_message_to_dp(stop_msg);
+
+ m_port_state = PORT_STATE_UP_IDLE;
+ #endif
+}
std::string
-TrexStatelessPort::get_state_as_string() {
+TrexStatelessPort::get_state_as_string() const {
switch (get_state()) {
case PORT_STATE_DOWN:
- return "down";
+ return "DOWN";
+
+ case PORT_STATE_IDLE:
+ return "IDLE";
- case PORT_STATE_UP_IDLE:
- return "idle";
+ case PORT_STATE_STREAMS:
+ return "STREAMS";
- case PORT_STATE_TRANSMITTING:
- return "transmitting";
+ case PORT_STATE_TX:
+ return "TX";
+
+ case PORT_STATE_PAUSE:
+ return "PAUSE";
}
- return "unknown";
+ return "UNKNOWN";
}
void
@@ -145,6 +242,24 @@ TrexStatelessPort::get_properties(string &driver, string &speed) {
speed = "1 Gbps";
}
+bool
+TrexStatelessPort::verify_state(int state, bool should_throw) const {
+ if ( (state & m_port_state) == 0 ) {
+ if (should_throw) {
+ throw TrexRpcException("command cannot be executed on current state: '" + get_state_as_string() + "'");
+ } else {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void
+TrexStatelessPort::change_state(port_state_e new_state) {
+
+ m_port_state = new_state;
+}
/**
* generate a random connection handler
@@ -191,3 +306,39 @@ TrexStatelessPort::encode_stats(Json::Value &port) {
port["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors);
}
+void
+TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) {
+
+ for (auto core_id : m_cores_id_list) {
+
+ /* send the message to the core */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
+ ring->Enqueue((CGenNode *)msg->clone());
+ }
+
+}
+
+/**
+ * when a DP (async) event occurs - handle it
+ *
+ */
+void
+TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
+ Json::Value data;
+
+ switch (event_type) {
+
+ case TrexDpPortEvent::EVENT_STOP:
+ /* set a stop event */
+ change_state(PORT_STATE_STREAMS);
+ /* send a ZMQ event */
+
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
+ break;
+
+ default:
+ assert(0);
+
+ }
+}
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 3e071954..006ec97c 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -22,6 +22,10 @@ limitations under the License.
#define __TREX_STATELESS_PORT_H__
#include <trex_stream.h>
+#include <trex_dp_port_events.h>
+
+class TrexPlatformApi;
+class TrexStatelessCpToDpMsgBase;
/**
* describes a stateless port
@@ -29,15 +33,19 @@ limitations under the License.
* @author imarom (31-Aug-15)
*/
class TrexStatelessPort {
+ friend class TrexDpPortEvent;
+
public:
/**
* port state
*/
enum port_state_e {
- PORT_STATE_DOWN,
- PORT_STATE_UP_IDLE,
- PORT_STATE_TRANSMITTING
+ PORT_STATE_DOWN = 0x1,
+ PORT_STATE_IDLE = 0x2,
+ PORT_STATE_STREAMS = 0x4,
+ PORT_STATE_TX = 0x8,
+ PORT_STATE_PAUSE = 0x10,
};
/**
@@ -50,31 +58,55 @@ public:
RC_ERR_FAILED_TO_COMPILE_STREAMS
};
- TrexStatelessPort(uint8_t port_id);
+ TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api);
+
+ /**
+ * acquire port
+ * throws TrexException in case of an error
+ */
+ void acquire(const std::string &user, bool force = false);
+
+ /**
+ * release the port from the current user
+ * throws TrexException in case of an error
+ */
+ void release(void);
/**
* start traffic
- *
+ * throws TrexException in case of an error
*/
- rc_e start_traffic(double mul);
+ void start_traffic(double mul, double duration = -1);
/**
* stop traffic
- *
+ * throws TrexException in case of an error
+ */
+ void stop_traffic(void);
+
+ /**
+ * pause traffic
+ * throws TrexException in case of an error
+ */
+ void pause_traffic(void);
+
+ /**
+ * resume traffic
+ * throws TrexException in case of an error
*/
- rc_e stop_traffic(void);
+ void resume_traffic(void);
/**
- * access the stream table
+ * update current traffic on port
*
*/
- TrexStreamTable *get_stream_table();
+ void update_traffic(double mul);
/**
* get the port state
*
*/
- port_state_e get_state() {
+ port_state_e get_state() const {
return m_port_state;
}
@@ -82,7 +114,7 @@ public:
* port state as string
*
*/
- std::string get_state_as_string();
+ std::string get_state_as_string() const;
/**
* fill up properties of the port
@@ -94,6 +126,7 @@ public:
*/
void get_properties(std::string &driver, std::string &speed);
+
/**
* query for ownership
*
@@ -111,10 +144,75 @@ public:
return m_owner_handler;
}
- bool is_free_to_aquire() {
- return (m_owner == "none");
+
+ bool verify_owner_handler(const std::string &handler) {
+
+ return ( (m_owner != "none") && (m_owner_handler == handler) );
+
+ }
+
+ /**
+ * encode stats as JSON
+ */
+ void encode_stats(Json::Value &port);
+
+ uint8_t get_port_id() {
+ return m_port_id;
+ }
+
+ /**
+ * delegators
+ *
+ */
+
+ void add_stream(TrexStream *stream) {
+ verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
+
+ m_stream_table.add_stream(stream);
+
+ change_state(PORT_STATE_STREAMS);
}
+ void remove_stream(TrexStream *stream) {
+ verify_state(PORT_STATE_STREAMS);
+
+ m_stream_table.remove_stream(stream);
+
+ if (m_stream_table.size() == 0) {
+ change_state(PORT_STATE_IDLE);
+ }
+ }
+
+ void remove_and_delete_all_streams() {
+ verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
+
+ m_stream_table.remove_and_delete_all_streams();
+
+ change_state(PORT_STATE_IDLE);
+ }
+
+ TrexStream * get_stream_by_id(uint32_t stream_id) {
+ return m_stream_table.get_stream_by_id(stream_id);
+ }
+
+ void get_id_list(std::vector<uint32_t> &id_list) {
+ m_stream_table.get_id_list(id_list);
+ }
+
+ void get_object_list(std::vector<TrexStream *> &object_list) {
+ m_stream_table.get_object_list(object_list);
+ }
+
+ TrexDpPortEvents & get_dp_events() {
+ return m_dp_events;
+ }
+
+
+
+private:
+
+
+
/**
* take ownership of the server array
* this is static
@@ -131,30 +229,43 @@ public:
m_owner_handler = "";
}
- bool verify_owner_handler(const std::string &handler) {
+ bool is_free_to_aquire() {
+ return (m_owner == "none");
+ }
- return ( (m_owner != "none") && (m_owner_handler == handler) );
+ const std::vector<int> get_core_id_list () {
+ return m_cores_id_list;
}
+ bool verify_state(int state, bool should_throw = true) const;
+
+ void change_state(port_state_e new_state);
+
+ std::string generate_handler();
+
+ void send_message_to_dp(TrexStatelessCpToDpMsgBase *msg);
+
/**
- * encode stats as JSON
+ * triggered when event occurs
+ *
*/
- void encode_stats(Json::Value &port);
+ void on_dp_event_occured(TrexDpPortEvent::event_e event_type);
- uint8_t get_port_id() {
- return m_port_id;
- }
-private:
+ TrexStreamTable m_stream_table;
+ uint8_t m_port_id;
+ port_state_e m_port_state;
+ std::string m_owner;
+ std::string m_owner_handler;
- std::string generate_handler();
+ /* holds the DP cores associated with this port */
+ std::vector<int> m_cores_id_list;
+
+ bool m_last_all_streams_continues;
+ double m_last_duration;
- TrexStreamTable m_stream_table;
- uint8_t m_port_id;
- port_state_e m_port_state;
- std::string m_owner;
- std::string m_owner_handler;
+ TrexDpPortEvents m_dp_events;
};
#endif /* __TREX_STATELESS_PORT_H__ */
diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp
index ba306137..5203b2a2 100644
--- a/src/stateless/cp/trex_stream.cpp
+++ b/src/stateless/cp/trex_stream.cpp
@@ -25,9 +25,76 @@ limitations under the License.
/**************************************
* stream
*************************************/
-TrexStream::TrexStream(uint8_t port_id, uint32_t stream_id) : m_port_id(port_id), m_stream_id(stream_id) {
+
+
+std::string TrexStream::get_stream_type_str(stream_type_t stream_type){
+
+ std::string res;
+
+
+ switch (stream_type) {
+
+ case stCONTINUOUS :
+ res="stCONTINUOUS ";
+ break;
+
+ case stSINGLE_BURST :
+ res="stSINGLE_BURST ";
+ break;
+
+ case stMULTI_BURST :
+ res="stMULTI_BURST ";
+ break;
+ default:
+ res="Unknow ";
+ };
+ return(res);
+}
+
+
+void TrexStream::Dump(FILE *fd){
+
+ fprintf(fd,"\n");
+ fprintf(fd,"==> Stream_id : %lu \n",(ulong)m_stream_id);
+ fprintf(fd," Enabled : %lu \n",(ulong)(m_enabled?1:0));
+ fprintf(fd," Self_start : %lu \n",(ulong)(m_self_start?1:0));
+
+ if (m_next_stream_id>=0) {
+ fprintf(fd," Nex_stream_id : %lu \n",(ulong)m_next_stream_id);
+ }else {
+ fprintf(fd," Nex_stream_id : %d \n",m_next_stream_id);
+ }
+
+ fprintf(fd," Port_id : %lu \n",(ulong)m_port_id);
+
+ if (m_isg_usec>0.0) {
+ fprintf(fd," isg : %6.2f \n",m_isg_usec);
+ }
+ fprintf(fd," type : %s \n",get_stream_type_str(m_type).c_str());
+
+ if ( m_type == TrexStream::stCONTINUOUS ) {
+ fprintf(fd," pps : %f \n",m_pps);
+ }
+ if (m_type == TrexStream::stSINGLE_BURST) {
+ fprintf(fd," pps : %f \n",m_pps);
+ fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts);
+ }
+ if (m_type == TrexStream::stMULTI_BURST) {
+ fprintf(fd," pps : %f \n",m_pps);
+ fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts);
+ fprintf(fd," mburst : %lu \n",(ulong)m_num_bursts);
+ if (m_ibg_usec>0.0) {
+ fprintf(fd," m_ibg_usec : %f \n",m_ibg_usec);
+ }
+ }
+}
+
+
+TrexStream::TrexStream(uint8_t type,
+ uint8_t port_id, uint32_t stream_id) : m_port_id(port_id), m_stream_id(stream_id) {
/* default values */
+ m_type = type;
m_isg_usec = 0;
m_next_stream_id = -1;
m_enabled = false;
@@ -38,6 +105,11 @@ TrexStream::TrexStream(uint8_t port_id, uint32_t stream_id) : m_port_id(port_id)
m_rx_check.m_enable = false;
+
+ m_pps=-1.0;
+ m_burst_total_pkts=0;
+ m_num_bursts=1;
+ m_ibg_usec=0.0;
}
TrexStream::~TrexStream() {
diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h
index c8a15240..0634829e 100644
--- a/src/stateless/cp/trex_stream.h
+++ b/src/stateless/cp/trex_stream.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -29,9 +30,28 @@ limitations under the License.
#include <json/json.h>
#include <trex_stream_vm.h>
+#include <stdio.h>
+#include <string.h>
class TrexRpcCmdAddStream;
+
+struct CStreamPktData {
+ uint8_t *binary;
+ uint16_t len;
+
+ std::string meta;
+
+public:
+ inline void clone(uint8_t * in_binary,
+ uint32_t in_pkt_size){
+ binary = new uint8_t[in_pkt_size];
+ len = in_pkt_size;
+ memcpy(binary,in_binary,in_pkt_size);
+ }
+};
+
+
/**
* Stateless Stream
*
@@ -39,8 +59,20 @@ class TrexRpcCmdAddStream;
class TrexStream {
public:
- TrexStream(uint8_t port_id, uint32_t stream_id);
- virtual ~TrexStream() = 0;
+ enum STREAM_TYPE {
+ stNONE = 0,
+ stCONTINUOUS = 4,
+ stSINGLE_BURST = 5,
+ stMULTI_BURST = 6
+ };
+
+ typedef uint8_t stream_type_t ;
+
+ static std::string get_stream_type_str(stream_type_t stream_type);
+
+public:
+ TrexStream(uint8_t type,uint8_t port_id, uint32_t stream_id);
+ virtual ~TrexStream();
/* defines the min max per packet supported */
static const uint32_t MIN_PKT_SIZE_BYTES = 1;
@@ -52,10 +84,78 @@ public:
/* access the stream json */
const Json::Value & get_stream_json();
+ /* compress the stream id to be zero based */
+ void fix_dp_stream_id(uint32_t my_stream_id,int next_stream_id){
+ m_stream_id = my_stream_id;
+ m_next_stream_id = next_stream_id;
+ }
+
+ double get_pps() {
+ return m_pps;
+ }
+
+ void set_pps(double pps){
+ m_pps = pps;
+ }
+
+ void set_type(uint8_t type){
+ m_type = type;
+ }
+
+ uint8_t get_type(void) const {
+ return ( m_type );
+ }
+
+ bool is_dp_next_stream(){
+ if (m_next_stream_id<0) {
+ return (false);
+ }else{
+ return (true);
+ }
+ }
+
+
+
+ void set_multi_burst(uint32_t burst_total_pkts,
+ uint32_t num_bursts,
+ double ibg_usec) {
+ m_burst_total_pkts = burst_total_pkts;
+ m_num_bursts = num_bursts;
+ m_ibg_usec = ibg_usec;
+ }
+
+ void set_single_burst(uint32_t burst_total_pkts){
+ set_multi_burst(burst_total_pkts,1,0.0);
+ }
+
+ /* create new stream */
+ TrexStream * clone_as_dp(){
+ TrexStream * dp=new TrexStream(m_type,m_port_id,m_stream_id);
+
+
+ dp->m_isg_usec = m_isg_usec;
+ dp->m_next_stream_id = m_next_stream_id;
+
+ dp->m_enabled = m_enabled;
+ dp->m_self_start = m_self_start;
+
+ /* deep copy */
+ dp->m_pkt.clone(m_pkt.binary,m_pkt.len);
+
+ dp->m_rx_check = m_rx_check;
+ dp->m_pps = m_pps;
+ dp->m_burst_total_pkts = m_burst_total_pkts;
+ dp->m_num_bursts = m_num_bursts;
+ dp->m_ibg_usec = m_ibg_usec ;
+ return (dp);
+ }
+
+ void Dump(FILE *fd);
public:
/* basic */
+ uint8_t m_type;
uint8_t m_port_id;
- uint32_t m_stream_id;
+ uint32_t m_stream_id; /* id from RPC can be anything */
/* config fields */
@@ -65,13 +165,9 @@ public:
/* indicators */
bool m_enabled;
bool m_self_start;
-
+
+ CStreamPktData m_pkt;
/* pkt */
- struct {
- uint8_t *binary;
- uint16_t len;
- std::string meta;
- } m_pkt;
/* VM */
StreamVm m_vm;
@@ -85,64 +181,19 @@ public:
} m_rx_check;
+ double m_pps;
- /* original template provided by requester */
- Json::Value m_stream_json;
-};
+ uint32_t m_burst_total_pkts; /* valid in case of burst stSINGLE_BURST,stMULTI_BURST*/
-/**
- * continuous stream
- *
- */
-class TrexStreamContinuous : public TrexStream {
-public:
- TrexStreamContinuous(uint8_t port_id, uint32_t stream_id, double pps) : TrexStream(port_id, stream_id), m_pps(pps) {
- }
+ uint32_t m_num_bursts; /* valid in case of stMULTI_BURST */
- double get_pps() {
- return m_pps;
- }
+ double m_ibg_usec; /* valid in case of stMULTI_BURST */
-protected:
- double m_pps;
-};
-
-/**
- * single burst
- *
- */
-class TrexStreamBurst : public TrexStream {
-public:
- TrexStreamBurst(uint8_t port_id, uint32_t stream_id, uint32_t total_pkts, double pps) :
- TrexStream(port_id, stream_id),
- m_total_pkts(total_pkts),
- m_pps(pps) {
- }
+ /* original template provided by requester */
+ Json::Value m_stream_json;
-protected:
- uint32_t m_total_pkts;
- double m_pps;
};
-/**
- * multi burst
- *
- */
-class TrexStreamMultiBurst : public TrexStreamBurst {
-public:
- TrexStreamMultiBurst(uint8_t port_id,
- uint32_t stream_id,
- uint32_t pkts_per_burst,
- double pps,
- uint32_t num_bursts,
- double ibg_usec) : TrexStreamBurst(port_id, stream_id, pkts_per_burst, pps), m_num_bursts(num_bursts), m_ibg_usec(ibg_usec) {
-
- }
-protected:
- uint32_t m_num_bursts;
- double m_ibg_usec;
-
-};
/**
* holds all the streams
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
index 5e2602ec..302863ae 100644
--- a/src/stateless/cp/trex_streams_compiler.cpp
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -19,42 +19,390 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-#include <string.h>
+#include <string>
+#include <sstream>
#include <trex_streams_compiler.h>
#include <trex_stream.h>
+#include <assert.h>
+#include <trex_stateless.h>
+#include <iostream>
+
+/**
+ * describes a graph node in the pre compile check
+ *
+ * @author imarom (16-Nov-15)
+ */
+class GraphNode {
+public:
+ GraphNode(TrexStream *stream, GraphNode *next) : m_stream(stream), m_next(next) {
+ marked = false;
+ m_compressed_stream_id=-1;
+ }
+
+ uint32_t get_stream_id() const {
+ return m_stream->m_stream_id;
+ }
+
+ const TrexStream *m_stream;
+ GraphNode *m_next;
+ std::vector<const GraphNode *> m_parents;
+ bool marked;
+ int m_compressed_stream_id;
+};
+
+/**
+ * node map
+ *
+ */
+class GraphNodeMap {
+public:
+
+ GraphNodeMap() : m_dead_end(NULL, NULL) {
+
+ }
+
+ bool add(GraphNode *node) {
+ if (has(node->get_stream_id())) {
+ return false;
+ }
+
+ m_nodes[node->get_stream_id()] = node;
+
+ if (node->m_stream->m_self_start) {
+ m_roots.push_back(node);
+ }
+
+ return true;
+ }
+
+ bool has(uint32_t stream_id) {
+
+ return (get(stream_id) != NULL);
+ }
+
+ GraphNode * get(uint32_t stream_id) {
+
+ if (stream_id == -1) {
+ return &m_dead_end;
+ }
+
+ auto search = m_nodes.find(stream_id);
+
+ if (search != m_nodes.end()) {
+ return search->second;
+ } else {
+ return NULL;
+ }
+ }
+
+ void clear_marks() {
+ for (auto node : m_nodes) {
+ node.second->marked = false;
+ }
+ }
+
+ void get_unmarked(std::vector <GraphNode *> &unmarked) {
+ for (auto node : m_nodes) {
+ if (!node.second->marked) {
+ unmarked.push_back(node.second);
+ }
+ }
+ }
+
+
+ ~GraphNodeMap() {
+ for (auto node : m_nodes) {
+ delete node.second;
+ }
+ m_nodes.clear();
+ }
+
+ std::vector <GraphNode *> & get_roots() {
+ return m_roots;
+ }
+
+
+ std::unordered_map<uint32_t, GraphNode *> get_nodes() {
+ return m_nodes;
+ }
+
+private:
+ std::unordered_map<uint32_t, GraphNode *> m_nodes;
+ std::vector <GraphNode *> m_roots;
+ GraphNode m_dead_end;
+};
/**************************************
* stream compiled object
*************************************/
TrexStreamsCompiledObj::TrexStreamsCompiledObj(uint8_t port_id, double mul) : m_port_id(port_id), m_mul(mul) {
+ m_all_continues=false;
}
TrexStreamsCompiledObj::~TrexStreamsCompiledObj() {
- for (auto &obj : m_objs) {
- delete obj.m_pkt;
+ for (auto obj : m_objs) {
+ delete obj.m_stream;
}
m_objs.clear();
}
+
void
-TrexStreamsCompiledObj::add_compiled_stream(double pps, uint8_t *pkt, uint16_t pkt_len) {
+TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream){
+
obj_st obj;
- obj.m_port_id = m_port_id;
- obj.m_pps = pps * m_mul;
- obj.m_pkt_len = pkt_len;
+ obj.m_stream = stream->clone_as_dp();
+
+ m_objs.push_back(obj);
+}
+
+void
+TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream,
+ uint32_t my_dp_id, int next_dp_id) {
+ obj_st obj;
- obj.m_pkt = new uint8_t[pkt_len];
- memcpy(obj.m_pkt, pkt, pkt_len);
+ obj.m_stream = stream->clone_as_dp();
+ /* compress the id's*/
+ obj.m_stream->fix_dp_stream_id(my_dp_id,next_dp_id);
m_objs.push_back(obj);
}
+void TrexStreamsCompiledObj::Dump(FILE *fd){
+ for (auto obj : m_objs) {
+ obj.m_stream->Dump(fd);
+ }
+}
+
+
+
+TrexStreamsCompiledObj *
+TrexStreamsCompiledObj::clone() {
+
+ /* use multiplier of 1 to avoid double mult */
+ TrexStreamsCompiledObj *new_compiled_obj = new TrexStreamsCompiledObj(m_port_id, 1);
+
+ /**
+ * clone each element
+ */
+ for (auto obj : m_objs) {
+ new_compiled_obj->add_compiled_stream(obj.m_stream);
+ }
+
+ new_compiled_obj->m_mul = m_mul;
+
+ return new_compiled_obj;
+}
+
+void
+TrexStreamsCompiler::add_warning(const std::string &warning) {
+ m_warnings.push_back("*** warning: " + warning);
+}
+
+void
+TrexStreamsCompiler::err(const std::string &err) {
+ throw TrexException("*** error: " + err);
+}
+
+void
+TrexStreamsCompiler::check_stream(const TrexStream *stream) {
+ std::stringstream ss;
+
+ /* cont. stream can point only on itself */
+ if (stream->get_type() == TrexStream::stCONTINUOUS) {
+ if (stream->m_next_stream_id != -1) {
+ ss << "continous stream '" << stream->m_stream_id << "' cannot point on another stream";
+ err(ss.str());
+ }
+ }
+}
+
+void
+TrexStreamsCompiler::allocate_pass(const std::vector<TrexStream *> &streams,
+ GraphNodeMap *nodes) {
+ std::stringstream ss;
+ uint32_t compressed_stream_id=0;
+
+
+ /* first pass - allocate all nodes and check for duplicates */
+ for (auto stream : streams) {
+
+ /* skip non enabled streams */
+ if (!stream->m_enabled) {
+ continue;
+ }
+
+ /* sanity check on the stream itself */
+ check_stream(stream);
+
+ /* duplicate stream id ? */
+ if (nodes->has(stream->m_stream_id)) {
+ ss << "duplicate instance of stream id " << stream->m_stream_id;
+ err(ss.str());
+ }
+
+ GraphNode *node = new GraphNode(stream, NULL);
+ /* allocate new compressed id */
+ node->m_compressed_stream_id = compressed_stream_id;
+
+ compressed_stream_id++;
+
+ /* add to the map */
+ assert(nodes->add(node));
+ }
+
+}
+
+/**
+ * on this pass we direct the graph to point to the right nodes
+ *
+ */
+void
+TrexStreamsCompiler::direct_pass(GraphNodeMap *nodes) {
+
+ /* second pass - direct the graph */
+ for (auto p : nodes->get_nodes()) {
+
+ GraphNode *node = p.second;
+ const TrexStream *stream = node->m_stream;
+
+ /* check the stream points on an existing stream */
+ GraphNode *next_node = nodes->get(stream->m_next_stream_id);
+ if (!next_node) {
+ std::stringstream ss;
+ ss << "stream " << node->get_stream_id() << " is pointing on non existent stream " << stream->m_next_stream_id;
+ err(ss.str());
+ }
+
+ node->m_next = next_node;
+
+ /* do we have more than one parent ? */
+ next_node->m_parents.push_back(node);
+ }
+
+
+ /* check for multiple parents */
+ for (auto p : nodes->get_nodes()) {
+ GraphNode *node = p.second;
+
+ if (node->m_parents.size() > 0 ) {
+ std::stringstream ss;
+
+ ss << "stream " << node->get_stream_id() << " is triggered by multiple streams: ";
+ for (auto x : node->m_parents) {
+ ss << x->get_stream_id() << " ";
+ }
+
+ add_warning(ss.str());
+ }
+ }
+}
+
+/**
+ * mark sure all the streams are reachable
+ *
+ */
+void
+TrexStreamsCompiler::check_for_unreachable_streams(GraphNodeMap *nodes) {
+ /* start with the roots */
+ std::vector <GraphNode *> next_nodes = nodes->get_roots();
+
+
+ nodes->clear_marks();
+
+ /* run BFS from all the roots */
+ while (!next_nodes.empty()) {
+
+ /* pull one */
+ GraphNode *node = next_nodes.back();
+ next_nodes.pop_back();
+ if (node->marked) {
+ continue;
+ }
+
+ node->marked = true;
+
+ if (node->m_next != NULL) {
+ next_nodes.push_back(node->m_next);
+ }
+
+ }
+
+ std::vector <GraphNode *> unmarked;
+ nodes->get_unmarked(unmarked);
+
+ if (!unmarked.empty()) {
+ std::stringstream ss;
+ for (auto node : unmarked) {
+ ss << "stream " << node->get_stream_id() << " is unreachable from any other stream\n";
+ }
+ err(ss.str());
+ }
+
+
+}
+
+/**
+ * check validation of streams for compile
+ *
+ * @author imarom (16-Nov-15)
+ *
+ * @param streams
+ * @param fail_msg
+ *
+ * @return bool
+ */
+void
+TrexStreamsCompiler::pre_compile_check(const std::vector<TrexStream *> &streams,
+ GraphNodeMap & nodes) {
+
+ m_warnings.clear();
+
+ /* allocate nodes */
+ allocate_pass(streams, &nodes);
+
+ /* direct the graph */
+ direct_pass(&nodes);
+
+ /* check for non reachable streams inside the graph */
+ check_for_unreachable_streams(&nodes);
+
+}
+
/**************************************
* stream compiler
*************************************/
bool
-TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj) {
+TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams,
+ TrexStreamsCompiledObj &obj,
+ std::string *fail_msg) {
+
+#if 0
+ fprintf(stdout,"------------pre compile \n");
+ for (auto stream : streams) {
+ stream->Dump(stdout);
+ }
+ fprintf(stdout,"------------pre compile \n");
+#endif
+
+ GraphNodeMap nodes;
+
+
+ /* compile checks */
+ try {
+ pre_compile_check(streams,nodes);
+ } catch (const TrexException &ex) {
+ if (fail_msg) {
+ *fail_msg = ex.what();
+ } else {
+ std::cout << ex.what();
+ }
+ return false;
+ }
+
+
+ bool all_continues=true;
/* for now we do something trivial, */
for (auto stream : streams) {
@@ -62,24 +410,26 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, TrexStrea
if (!stream->m_enabled) {
continue;
}
-
- /* for now skip also non self started streams */
- if (!stream->m_self_start) {
- continue;
+ if (stream->get_type() != TrexStream::stCONTINUOUS ) {
+ all_continues=false;
}
- /* for now support only continous ... */
- TrexStreamContinuous *cont_stream = dynamic_cast<TrexStreamContinuous *>(stream);
- if (!cont_stream) {
- continue;
+ int new_id= nodes.get(stream->m_stream_id)->m_compressed_stream_id;
+ assert(new_id>=0);
+ uint32_t my_stream_id = (uint32_t)new_id;
+ int my_next_stream_id=-1;
+ if (stream->m_next_stream_id>=0) {
+ my_next_stream_id=nodes.get(stream->m_next_stream_id)->m_compressed_stream_id;
}
/* add it */
- obj.add_compiled_stream(cont_stream->get_pps(),
- cont_stream->m_pkt.binary,
- cont_stream->m_pkt.len);
+ obj.add_compiled_stream(stream,
+ my_stream_id,
+ my_next_stream_id
+ );
}
-
+ obj.m_all_continues =all_continues;
return true;
}
+
diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h
index 06f992ed..17ca3c74 100644
--- a/src/stateless/cp/trex_streams_compiler.h
+++ b/src/stateless/cp/trex_streams_compiler.h
@@ -23,9 +23,11 @@ limitations under the License.
#include <stdint.h>
#include <vector>
+#include <string>
class TrexStreamsCompiler;
class TrexStream;
+class GraphNodeMap;
/**
* compiled object for a table of streams
@@ -40,33 +42,78 @@ public:
~TrexStreamsCompiledObj();
struct obj_st {
- double m_pps;
- uint8_t *m_pkt;
- uint16_t m_pkt_len;
- uint8_t m_port_id;
+
+ TrexStream * m_stream;
};
const std::vector<obj_st> & get_objects() {
return m_objs;
}
+ uint8_t get_port_id(){
+ return (m_port_id);
+ }
+
+ /**
+ * clone the compiled object
+ *
+ */
+ TrexStreamsCompiledObj * clone();
+
+ double get_multiplier(){
+ return (m_mul);
+ }
+
+ bool get_all_streams_continues(){
+ return (m_all_continues);
+ }
+
+ void Dump(FILE *fd);
+
private:
- void add_compiled_stream(double pps, uint8_t *pkt, uint16_t pkt_len);
+ void add_compiled_stream(TrexStream * stream,
+ uint32_t my_dp_id, int next_dp_id);
+ void add_compiled_stream(TrexStream * stream);
+
std::vector<obj_st> m_objs;
+ bool m_all_continues;
uint8_t m_port_id;
double m_mul;
};
class TrexStreamsCompiler {
public:
+
/**
* compiles a vector of streams to an object passable to the DP
*
* @author imarom (28-Oct-15)
*
*/
- bool compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj);
+ bool compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj, std::string *fail_msg = NULL);
+
+ /**
+ *
+ * returns a reference pointer to the last compile warnings
+ * if no warnings were produced - the vector is empty
+ */
+ const std::vector<std::string> & get_last_compile_warnings() {
+ return m_warnings;
+ }
+
+private:
+
+ void pre_compile_check(const std::vector<TrexStream *> &streams,
+ GraphNodeMap & nodes);
+ void allocate_pass(const std::vector<TrexStream *> &streams, GraphNodeMap *nodes);
+ void direct_pass(GraphNodeMap *nodes);
+ void check_for_unreachable_streams(GraphNodeMap *nodes);
+ void check_stream(const TrexStream *stream);
+ void add_warning(const std::string &warning);
+ void err(const std::string &err);
+
+ std::vector<std::string> m_warnings;
};
#endif /* __TREX_STREAMS_COMPILER_H__ */
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 306b23d0..9b4a6ad9 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -22,12 +23,193 @@ limitations under the License.
#include <trex_stateless_messaging.h>
#include <trex_streams_compiler.h>
#include <trex_stream_node.h>
+#include <trex_stream.h>
#include <bp_sim.h>
-TrexStatelessDpCore::TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core) {
+
+void CDpOneStream::Delete(CFlowGenListPerThread * core){
+ assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
+ core->free_node((CGenNode *)m_node);
+ delete m_dp_stream;
+ m_node=0;
+ m_dp_stream=0;
+}
+
+void CDpOneStream::DeleteOnlyStream(){
+ assert(m_dp_stream);
+ delete m_dp_stream;
+ m_dp_stream=0;
+}
+
+int CGenNodeStateless::get_stream_id(){
+ if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
+ return (-1); // not valid
+ }
+ assert(m_ref_stream_info);
+ return ((int)m_ref_stream_info->m_stream_id);
+}
+
+
+void CGenNodeStateless::DumpHeader(FILE *fd){
+ fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
+
+}
+void CGenNodeStateless::Dump(FILE *fd){
+ fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu \n",
+ m_time,
+ (ulong)m_port_id,
+ "s-pkt", //action
+ get_stream_state_str(m_state ).c_str(),
+ get_stream_id(), //stream_id
+ TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
+ (ulong)m_multi_bursts,
+ (ulong)m_single_burst
+ );
+}
+
+
+void CGenNodeStateless::refresh(){
+
+ /* refill the stream info */
+ m_single_burst = m_single_burst_refill;
+ m_multi_bursts = m_ref_stream_info->m_num_bursts;
+ m_state = CGenNodeStateless::ss_ACTIVE;
+}
+
+
+void CGenNodeCommand::free_command(){
+
+ assert(m_cmd);
+ m_cmd->on_node_remove();
+ delete m_cmd;
+}
+
+
+std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
+ std::string res;
+
+ switch (stream_state) {
+ case CGenNodeStateless::ss_FREE_RESUSE :
+ res="FREE ";
+ break;
+ case CGenNodeStateless::ss_INACTIVE :
+ res="INACTIVE ";
+ break;
+ case CGenNodeStateless::ss_ACTIVE :
+ res="ACTIVE ";
+ break;
+ default:
+ res="Unknow ";
+ };
+ return(res);
+}
+
+
+void CGenNodeStateless::free_stl_node(){
+ /* if we have cache mbuf free it */
+ rte_mbuf_t * m=get_cache_mbuf();
+ if (m) {
+ rte_pktmbuf_free(m);
+ m_cache_mbuf=0;
+ }
+}
+
+
+bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
+ m_active_streams-=d; /* reduce the number of streams */
+ if (m_active_streams == 0) {
+ return (true);
+ }
+ return (false);
+}
+
+bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
+
+ /* we are working with continues streams so we must be in transmit mode */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node =dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
+ assert(node->is_pause() == true);
+ node->set_pause(false);
+ }
+ m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
+ return (true);
+}
+
+
+bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
+
+ /* we are working with continues streams so we must be in transmit mode */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node =dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
+ assert(node->is_pause() == false);
+ node->set_pause(true);
+ }
+ m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
+ return (true);
+}
+
+
+bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id){
+
+
+ if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
+ assert(m_active_streams==0);
+ return false;
+ }
+
+ /* there could be race of stop after stop */
+ if ( stop_on_id ) {
+ if (event_id != m_event_id){
+ /* we can't stop it is an old message */
+ return false;
+ }
+ }
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node =dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
+ if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
+ node->mark_for_free();
+ m_active_streams--;
+ dp_stream.DeleteOnlyStream();
+
+ }else{
+ dp_stream.Delete(m_core);
+ }
+ }
+
+ /* active stream should be zero */
+ assert(m_active_streams==0);
+ m_active_nodes.clear();
+ m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
+ return (true);
+}
+
+
+void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){
+ m_core=core;
+ m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
+ m_port_id=0;
+ m_active_streams=0;
+ m_active_nodes.clear();
+}
+
+
+
+void
+TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
m_thread_id = thread_id;
m_core = core;
+ m_local_port_offset = 2*core->getDualPortId();
CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
@@ -35,8 +217,54 @@ TrexStatelessDpCore::TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThrea
m_ring_to_cp = cp_dp->getRingDpToCp(thread_id);
m_state = STATE_IDLE;
+
+ int i;
+ for (i=0; i<NUM_PORTS_PER_CORE; i++) {
+ m_ports[i].create(core);
+ }
}
+
+/* move to the next stream, old stream move to INACTIVE */
+bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node){
+
+ assert(cur_node);
+ TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
+ bool schedule =false;
+
+ bool to_stop_port=false;
+
+ if (next_node == NULL) {
+ /* there is no next stream , reduce the number of active streams*/
+ to_stop_port = lp_port->update_number_of_active_streams(1);
+
+ }else{
+ uint8_t state=next_node->get_state();
+
+ /* can't be FREE_RESUSE */
+ assert(state != CGenNodeStateless::ss_FREE_RESUSE);
+ if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
+
+ /* refill start info and scedule, no update in active streams */
+ next_node->refresh();
+ schedule = true;
+
+ }else{
+ to_stop_port = lp_port->update_number_of_active_streams(1);
+ }
+ }
+
+ if ( to_stop_port ) {
+ /* call stop port explictly to move the state */
+ stop_traffic(cur_node->m_port_id,false,0);
+ }
+
+ return ( schedule );
+}
+
+
+
/**
* in idle state loop, the processor most of the time sleeps
* and periodically checks for messages
@@ -52,6 +280,15 @@ TrexStatelessDpCore::idle_state_loop() {
}
}
+
+
+void TrexStatelessDpCore::quit_main_loop(){
+ m_core->set_terminate_mode(true); /* mark it as terminated */
+ m_state = STATE_TERMINATE;
+ add_global_duration(0.0001);
+}
+
+
/**
* scehduler runs when traffic exists
* it will return when no more transmitting is done on this
@@ -68,37 +305,172 @@ TrexStatelessDpCore::start_scheduler() {
m_core->m_node_gen.add_node(node_sync);
double old_offset = 0.0;
- m_core->m_node_gen.flush_file(100000000, 0.0, false, m_core, old_offset);
+ m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
+ /* bail out in case of terminate */
+ if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
+ m_core->m_node_gen.close_file(m_core);
+ }
}
+
+void
+TrexStatelessDpCore::run_once(){
+
+ idle_state_loop();
+
+ if ( m_state == STATE_TERMINATE ){
+ return;
+ }
+
+ start_scheduler();
+}
+
+
+
+
void
TrexStatelessDpCore::start() {
while (true) {
- idle_state_loop();
+ run_once();
+
+ if ( m_core->is_terminated_by_master() ) {
+ break;
+ }
+ }
+}
+
+/* only if both port are idle we can exit */
+void
+TrexStatelessDpCore::schedule_exit(){
+
+ CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
+
+ node->m_type = CGenNode::COMMAND;
- start_scheduler();
+ node->m_cmd = new TrexStatelessDpCanQuit();
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = m_core->m_cur_time_sec ;
+
+ m_core->m_node_gen.add_node((CGenNode *)node);
+}
+
+
+void
+TrexStatelessDpCore::add_global_duration(double duration){
+ if (duration > 0.0) {
+ CGenNode *node = m_core->create_node() ;
+
+ node->m_type = CGenNode::EXIT_SCHED;
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = m_core->m_cur_time_sec + duration ;
+
+ m_core->m_node_gen.add_node(node);
}
}
+/* add per port exit */
void
-TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len) {
+TrexStatelessDpCore::add_port_duration(double duration,
+ uint8_t port_id,
+ int event_id){
+ if (duration > 0.0) {
+ CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
+
+ node->m_type = CGenNode::COMMAND;
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = m_core->m_cur_time_sec + duration ;
+
+ TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
+
+
+ /* test this */
+ m_core->m_non_active_nodes++;
+ cmd->set_core_ptr(m_core);
+ cmd->set_event_id(event_id);
+ cmd->set_wait_for_event_id(true);
+
+ node->m_cmd = cmd;
+
+ m_core->m_node_gen.add_node((CGenNode *)node);
+ }
+}
+
+
+void
+TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port,
+ TrexStream * stream,
+ TrexStreamsCompiledObj *comp) {
+
CGenNodeStateless *node = m_core->create_node_sl();
/* add periodic */
node->m_type = CGenNode::STATELESS_PKT;
- node->m_time = m_core->m_cur_time_sec + 0.0 /* STREAM ISG */;
+
+ node->m_ref_stream_info = stream->clone_as_dp();
+
+ node->m_next_stream=0; /* will be fixed later */
+
+
+ if ( stream->m_self_start ){
+ /* if self start it is in active mode */
+ node->m_state =CGenNodeStateless::ss_ACTIVE;
+ lp_port->m_active_streams++;
+ }else{
+ node->m_state =CGenNodeStateless::ss_INACTIVE;
+ }
+
+ node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
+
+ pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
node->m_flags = 0;
/* set socket id */
node->set_socket_id(m_core->m_node_gen.m_socket_id);
/* build a mbuf from a packet */
- uint16_t pkt_size = pkt_len;
- const uint8_t *stream_pkt = pkt;
+
+ uint16_t pkt_size = stream->m_pkt.len;
+ const uint8_t *stream_pkt = stream->m_pkt.binary;
+
+ node->m_pause =0;
+ node->m_stream_type = stream->m_type;
+ node->m_next_time_offset = 1.0 / (stream->get_pps() * comp->get_multiplier()) ;
+
+
+ /* stateless specific fields */
+ switch ( stream->m_type ) {
+
+ case TrexStream::stCONTINUOUS :
+ node->m_single_burst=0;
+ node->m_single_burst_refill=0;
+ node->m_multi_bursts=0;
+ node->m_ibg_sec = 0.0;
+ break;
- node->m_next_time_offset = 1.0 / pps;
- node->m_is_stream_active = 1;
+ case TrexStream::stSINGLE_BURST :
+ node->m_stream_type = TrexStream::stMULTI_BURST;
+ node->m_single_burst = stream->m_burst_total_pkts;
+ node->m_single_burst_refill = stream->m_burst_total_pkts;
+ node->m_multi_bursts = 1; /* single burst in multi burst of 1 */
+ node->m_ibg_sec = 0.0;
+ break;
+
+ case TrexStream::stMULTI_BURST :
+ node->m_single_burst = stream->m_burst_total_pkts;
+ node->m_single_burst_refill = stream->m_burst_total_pkts;
+ node->m_multi_bursts = stream->m_num_bursts;
+ node->m_ibg_sec = usec_to_sec( stream->m_ibg_usec );
+ break;
+ default:
+
+ assert(0);
+ };
+
+ node->m_port_id = stream->m_port_id;
/* allocate const mbuf */
rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
@@ -110,7 +482,6 @@ TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pk
memcpy(p,stream_pkt,pkt_size);
/* set dir 0 or 1 client or server */
- pkt_dir_t dir = 0;
node->set_mbuf_cache_dir(dir);
/* TBD repace the mac if req we should add flag */
@@ -119,55 +490,130 @@ TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pk
/* set the packet as a readonly */
node->set_cache_mbuf(m);
- /* keep track */
- m_active_nodes.push_back(node);
+ CDpOneStream one_stream;
- /* schedule */
- m_core->m_node_gen.add_node((CGenNode *)node);
+ one_stream.m_dp_stream = node->m_ref_stream_info;
+ one_stream.m_node =node;
- m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
+ lp_port->m_active_nodes.push_back(one_stream);
+ /* schedule only if active */
+ if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
+ m_core->m_node_gen.add_node((CGenNode *)node);
+ }
}
void
-TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj) {
+TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
+ double duration,
+ int event_id) {
+
+
+ TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
+ lp_port->m_active_streams = 0;
+ lp_port->set_event_id(event_id);
+
+ /* no nodes in the list */
+ assert(lp_port->m_active_nodes.size()==0);
+
+ for (auto single_stream : obj->get_objects()) {
+ /* all commands should be for the same port */
+ assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
+ add_cont_stream(lp_port,single_stream.m_stream,obj);
+ }
+
+ uint32_t nodes = lp_port->m_active_nodes.size();
+ /* find next stream */
+ assert(nodes == obj->get_objects().size());
+
+ int cnt=0;
+
+ /* set the next_stream pointer */
for (auto single_stream : obj->get_objects()) {
- add_cont_stream(single_stream.m_pps, single_stream.m_pkt, single_stream.m_pkt_len);
+
+ if (single_stream.m_stream->is_dp_next_stream() ) {
+ int stream_id = single_stream.m_stream->m_next_stream_id;
+ assert(stream_id<nodes);
+ /* point to the next stream , stream_id is fixed */
+ lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
+ }
+ cnt++;
+ }
+
+ lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
+ m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
+
+
+ if ( duration > 0.0 ){
+ add_port_duration( duration ,obj->get_port_id(),event_id );
}
+
}
-void
-TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
- /* we cannot remove nodes not from the top of the queue so
- for every active node - make sure next time
- the scheduler invokes it, it will be free */
- for (auto node : m_active_nodes) {
- if (node->m_port_id == port_id) {
- node->m_is_stream_active = 0;
+
+bool TrexStatelessDpCore::are_all_ports_idle(){
+
+ bool res=true;
+ int i;
+ for (i=0; i<NUM_PORTS_PER_CORE; i++) {
+ if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
+ res=false;
}
}
+ return (res);
+}
- /* remove all the non active nodes */
- auto pred = std::remove_if(m_active_nodes.begin(),
- m_active_nodes.end(),
- [](CGenNodeStateless *node) { return (!node->m_is_stream_active); });
- m_active_nodes.erase(pred, m_active_nodes.end());
+void
+TrexStatelessDpCore::resume_traffic(uint8_t port_id){
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ lp_port->resume_traffic(port_id);
+}
- if (m_active_nodes.size() == 0) {
- m_state = STATE_IDLE;
- /* stop the scheduler */
- CGenNode *node = m_core->create_node() ;
+void
+TrexStatelessDpCore::pause_traffic(uint8_t port_id){
- node->m_type = CGenNode::EXIT_SCHED;
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
- /* make sure it will be scheduled after the current node */
- node->m_time = m_core->m_node_gen.m_p_queue.top()->m_time;
+ lp_port->pause_traffic(port_id);
+}
- m_core->m_node_gen.add_node(node);
+
+void
+TrexStatelessDpCore::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id) {
+ /* we cannot remove nodes not from the top of the queue so
+ for every active node - make sure next time
+ the scheduler invokes it, it will be free */
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
+ /* nothing to do ! already stopped */
+ //printf(" skip .. %f\n",m_core->m_cur_time_sec);
+ return;
}
-
+
+#if 0
+ if ( are_all_ports_idle() ) {
+ /* just a place holder if we will need to do somthing in that case */
+ }
+#endif
+
+ /* inform the control plane we stopped - this might be a async stop
+ (streams ended)
+ */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
+ TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
+ port_id,
+ TrexDpPortEvent::EVENT_STOP,
+ lp_port->get_event_id());
+ ring->Enqueue((CGenNode *)event_msg);
+
}
/**
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 698cac2f..eda1ae59 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -31,6 +32,74 @@ class TrexStatelessDpStart;
class CFlowGenListPerThread;
class CGenNodeStateless;
class TrexStreamsCompiledObj;
+class TrexStream;
+
+
+class CDpOneStream {
+public:
+ void Create(){
+ }
+
+ void Delete(CFlowGenListPerThread * core);
+ void DeleteOnlyStream();
+
+ CGenNodeStateless * m_node; // schedule node
+ TrexStream * m_dp_stream; // stream info
+};
+
+class TrexStatelessDpPerPort {
+
+public:
+ /* states */
+ enum state_e {
+ ppSTATE_IDLE,
+ ppSTATE_TRANSMITTING,
+ ppSTATE_PAUSE
+
+ };
+
+public:
+ TrexStatelessDpPerPort(){
+ }
+
+ void create(CFlowGenListPerThread * core);
+
+ bool pause_traffic(uint8_t port_id);
+
+ bool resume_traffic(uint8_t port_id);
+
+ bool stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id);
+
+ bool update_number_of_active_streams(uint32_t d);
+
+ state_e get_state() {
+ return m_state;
+ }
+
+ void set_event_id(int event_id) {
+ m_event_id = event_id;
+ }
+
+ int get_event_id() {
+ return m_event_id;
+ }
+
+public:
+
+ state_e m_state;
+ uint8_t m_port_id;
+
+ uint32_t m_active_streams; /* how many active streams on this port */
+
+ std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */
+ CFlowGenListPerThread * m_core ;
+ int m_event_id;
+};
+
+/* for now */
+#define NUM_PORTS_PER_CORE 2
class TrexStatelessDpCore {
@@ -39,10 +108,24 @@ public:
/* states */
enum state_e {
STATE_IDLE,
- STATE_TRANSMITTING
+ STATE_TRANSMITTING,
+ STATE_TERMINATE
+
};
- TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core);
+ TrexStatelessDpCore() {
+ m_thread_id = 0;
+ m_core = NULL;
+ m_duration = -1;
+ }
+
+ /**
+ * "static constructor"
+ *
+ * @param thread_id
+ * @param core
+ */
+ void create(uint8_t thread_id, CFlowGenListPerThread *core);
/**
* launch the stateless DP core code
@@ -50,6 +133,10 @@ public:
*/
void start();
+
+ /* exit after batch of commands */
+ void run_once();
+
/**
* dummy traffic creator
*
@@ -58,13 +145,30 @@ public:
* @param pkt
* @param pkt_len
*/
- void start_traffic(TrexStreamsCompiledObj *obj);
+ void start_traffic(TrexStreamsCompiledObj *obj,
+ double duration,
+ int m_event_id);
+
+
+ /* pause the streams, work only if all are continues */
+ void pause_traffic(uint8_t port_id);
+
+
+
+ void resume_traffic(uint8_t port_id);
+
/**
+ *
* stop all traffic for this core
*
*/
- void stop_traffic(uint8_t port_id);
+ void stop_traffic(uint8_t port_id,bool stop_on_id, int event_id);
+
+
+ /* return if all ports are idel */
+ bool are_all_ports_idle();
+
/**
* check for and handle messages from CP
@@ -92,7 +196,31 @@ public:
}
+ /* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */
+ void quit_main_loop();
+
+ state_e get_state() {
+ return m_state;
+ }
+
+ bool set_stateless_next_node(CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node);
+
+
+ TrexStatelessDpPerPort * get_port_db(uint8_t port_id){
+ assert((m_local_port_offset==port_id) ||(m_local_port_offset+1==port_id));
+ uint8_t local_port_id = port_id -m_local_port_offset;
+ assert(local_port_id<NUM_PORTS_PER_CORE);
+ return (&m_ports[local_port_id]);
+ }
+
+
+
private:
+
+ void schedule_exit();
+
+
/**
* in idle state loop, the processor most of the time sleeps
* and periodically checks for messages
@@ -115,18 +243,30 @@ private:
*/
void handle_cp_msg(TrexStatelessCpToDpMsgBase *msg);
- void add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len);
+
+ void add_port_duration(double duration,
+ uint8_t port_id,
+ int event_id);
+
+ void add_global_duration(double duration);
+
+ void add_cont_stream(TrexStatelessDpPerPort * lp_port,
+ TrexStream * stream,
+ TrexStreamsCompiledObj *comp);
uint8_t m_thread_id;
- state_e m_state;
+ uint8_t m_local_port_offset;
+
+ state_e m_state; /* state of all ports */
CNodeRing *m_ring_from_cp;
CNodeRing *m_ring_to_cp;
- /* holds the current active nodes */
- std::vector<CGenNodeStateless *> m_active_nodes;
+ TrexStatelessDpPerPort m_ports[NUM_PORTS_PER_CORE];
/* pointer to the main object */
- CFlowGenListPerThread *m_core;
+ CFlowGenListPerThread * m_core;
+
+ double m_duration;
};
#endif /* __TREX_STATELESS_DP_CORE_H__ */
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index 92b428ab..ccf99eaa 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoh Haim
Cisco Systems, Inc.
*/
@@ -22,44 +23,201 @@ limitations under the License.
#define __TREX_STREAM_NODE_H__
#include <bp_sim.h>
+#include <stdio.h>
class TrexStatelessDpCore;
+#include <trex_stream.h>
+
+class TrexStatelessCpToDpMsgBase;
+class CFlowGenListPerThread;
+
+struct CGenNodeCommand : public CGenNodeBase {
+
+friend class TrexStatelessDpCore;
+
+public:
+ TrexStatelessCpToDpMsgBase * m_cmd;
+
+ uint8_t m_pad_end[104];
+
+public:
+ void free_command();
+
+} __rte_cache_aligned;;
+
+
+static_assert(sizeof(CGenNodeCommand) == sizeof(CGenNode), "sizeof(CGenNodeCommand) != sizeof(CGenNode)" );
+
/* this is a event for stateless */
struct CGenNodeStateless : public CGenNodeBase {
friend class TrexStatelessDpCore;
+public:
+ enum {
+ ss_FREE_RESUSE =1, /* should be free by scheduler */
+ ss_INACTIVE =2, /* will be active by other stream or stopped */
+ ss_ACTIVE =3 /* the stream is active */
+ };
+ typedef uint8_t stream_state_t ;
+
+ static std::string get_stream_state_str(stream_state_t stream_state);
+
private:
+ /* cache line 0 */
+ /* important stuff here */
void * m_cache_mbuf;
- double m_next_time_offset;
- uint8_t m_is_stream_active;
+ double m_next_time_offset; /* in sec */
+ double m_ibg_sec; /* inter burst time in sec */
+
+
+ stream_state_t m_state;
uint8_t m_port_id;
+ uint8_t m_stream_type; /* see TrexStream::STREAM_TYPE ,stream_type_t */
+ uint8_t m_pause;
+
+ uint32_t m_single_burst; /* the number of bursts in case of burst */
+ uint32_t m_single_burst_refill;
+
+ uint32_t m_multi_bursts; /* in case of multi_burst how many bursts */
+
+ /* cache line 1 */
+ TrexStream * m_ref_stream_info; /* the stream info */
+ CGenNodeStateless * m_next_stream;
/* pad to match the size of CGenNode */
- uint8_t m_pad_end[87];
+ uint8_t m_pad_end[56];
+
+
public:
- inline bool is_active() {
- return m_is_stream_active;
+ uint8_t get_port_id(){
+ return (m_port_id);
+ }
+
+
+ /* we restart the stream, schedule it using stream isg */
+ inline void update_refresh_time(double cur_time){
+ m_time = cur_time + usec_to_sec(m_ref_stream_info->m_isg_usec);
+ }
+
+ inline bool is_mask_for_free(){
+ return (get_state() == CGenNodeStateless::ss_FREE_RESUSE ?true:false);
+
+ }
+ inline void mark_for_free(){
+ set_state(CGenNodeStateless::ss_FREE_RESUSE);
+ /* only to be safe */
+ m_ref_stream_info= NULL;
+ m_next_stream= NULL;
+ }
+
+ bool is_pause(){
+ return (m_pause==1?true:false);
+ }
+
+ void set_pause(bool enable){
+ if ( enable ){
+ m_pause=1;
+ }else{
+ m_pause=0;
+ }
+ }
+
+ inline uint8_t get_stream_type(){
+ return (m_stream_type);
+ }
+
+ inline uint32_t get_single_burst_cnt(){
+ return (m_single_burst);
+ }
+
+ inline double get_multi_ibg_sec(){
+ return (m_ibg_sec);
+ }
+
+ inline uint32_t get_multi_burst_cnt(){
+ return (m_multi_bursts);
+ }
+
+ inline void set_state(stream_state_t new_state){
+ m_state=new_state;
+ }
+
+
+ inline stream_state_t get_state() {
+ return m_state;
+ }
+
+ void refresh();
+
+ inline void handle_continues(CFlowGenListPerThread *thread) {
+
+ if (unlikely (is_pause()==false)) {
+ thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+ }
+
+ /* in case of continues */
+ m_time += m_next_time_offset;
+
+ /* insert a new event */
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
+ }
+
+ inline void handle_multi_burst(CFlowGenListPerThread *thread) {
+ thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+
+ m_single_burst--;
+ if (m_single_burst > 0 ) {
+ /* in case of continues */
+ m_time += m_next_time_offset;
+
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
+ }else{
+ m_multi_bursts--;
+ if ( m_multi_bursts == 0 ) {
+ set_state(CGenNodeStateless::ss_INACTIVE);
+ if ( thread->set_stateless_next_node(this,m_next_stream) ){
+ /* update the next stream time using isg */
+ m_next_stream->update_refresh_time(m_time);
+
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)m_next_stream);
+ }else{
+ // in case of zero we will schedule a command to stop
+ // will be called from set_stateless_next_node
+ }
+
+ }else{
+ m_time += m_ibg_sec;
+ m_single_burst = m_single_burst_refill;
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
+ }
+ }
}
/**
* main function to handle an event of a packet tx
*
+ *
+ *
*/
- inline void handle(CFlowGenListPerThread *thread) {
- thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+ inline void handle(CFlowGenListPerThread *thread) {
- /* in case of continues */
- m_time += m_next_time_offset;
+ if (m_stream_type == TrexStream::stCONTINUOUS ) {
+ handle_continues(thread) ;
+ }else{
+ if (m_stream_type == TrexStream::stMULTI_BURST) {
+ handle_multi_burst(thread);
+ }else{
+ assert(0);
+ }
+ }
- /* insert a new event */
- thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
}
void set_socket_id(socket_id_t socket){
@@ -82,8 +240,6 @@ public:
return ((pkt_dir_t)( m_flags &1));
}
-
-
inline void set_cache_mbuf(rte_mbuf_t * m){
m_cache_mbuf=(void *)m;
m_flags |= NODE_FLAGS_MBUF_CACHE;
@@ -97,9 +253,22 @@ public:
}
}
+ void free_stl_node();
+
+public:
+ /* debug functions */
+
+ int get_stream_id();
+
+ static void DumpHeader(FILE *fd);
+
+ void Dump(FILE *fd);
} __rte_cache_aligned;
-static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)");
+static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)" );
+
+
+
#endif /* __TREX_STREAM_NODE_H__ */
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 3e754649..ec8b7839 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -21,12 +22,34 @@ limitations under the License.
#include <trex_stateless_messaging.h>
#include <trex_stateless_dp_core.h>
#include <trex_streams_compiler.h>
+#include <trex_stateless.h>
+#include <bp_sim.h>
+
#include <string.h>
/*************************
start traffic message
************************/
-TrexStatelessDpStart::TrexStatelessDpStart(TrexStreamsCompiledObj *obj) : m_obj(obj) {
+TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexStreamsCompiledObj *obj, double duration) {
+ m_port_id = port_id;
+ m_event_id = event_id;
+ m_obj = obj;
+ m_duration = duration;
+}
+
+
+/**
+ * clone for DP start message
+ *
+ */
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpStart::clone() {
+
+ TrexStreamsCompiledObj *new_obj = m_obj->clone();
+
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStart(m_port_id, m_event_id, new_obj, m_duration);
+
+ return new_msg;
}
TrexStatelessDpStart::~TrexStatelessDpStart() {
@@ -38,7 +61,9 @@ TrexStatelessDpStart::~TrexStatelessDpStart() {
bool
TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
- dp_core->start_traffic(m_obj);
+ /* staet traffic */
+ dp_core->start_traffic(m_obj, m_duration,m_event_id);
+
return true;
}
@@ -47,7 +72,104 @@ TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
************************/
bool
TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) {
- dp_core->stop_traffic(m_port_id);
+
+
+ dp_core->stop_traffic(m_port_id,m_stop_only_for_event_id,m_event_id);
return true;
}
+
+void TrexStatelessDpStop::on_node_remove(){
+ if ( m_core ) {
+ assert(m_core->m_non_active_nodes>0);
+ m_core->m_non_active_nodes--;
+ }
+}
+
+
+TrexStatelessCpToDpMsgBase * TrexStatelessDpPause::clone(){
+
+ TrexStatelessDpPause *new_msg = new TrexStatelessDpPause(m_port_id);
+ return new_msg;
+}
+
+
+bool TrexStatelessDpPause::handle(TrexStatelessDpCore *dp_core){
+ dp_core->pause_traffic(m_port_id);
+ return (true);
+}
+
+
+
+TrexStatelessCpToDpMsgBase * TrexStatelessDpResume::clone(){
+ TrexStatelessDpResume *new_msg = new TrexStatelessDpResume(m_port_id);
+ return new_msg;
+}
+
+bool TrexStatelessDpResume::handle(TrexStatelessDpCore *dp_core){
+ dp_core->resume_traffic(m_port_id);
+ return (true);
+}
+
+
+/**
+ * clone for DP stop message
+ *
+ */
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpStop::clone() {
+ TrexStatelessDpStop *new_msg = new TrexStatelessDpStop(m_port_id);
+
+ new_msg->set_event_id(m_event_id);
+ new_msg->set_wait_for_event_id(m_stop_only_for_event_id);
+ /* set back pointer to master */
+ new_msg->set_core_ptr(m_core);
+
+ return new_msg;
+}
+
+
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpQuit::clone(){
+
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpQuit();
+
+ return new_msg;
+}
+
+
+bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){
+
+ /* quit */
+ dp_core->quit_main_loop();
+ return (true);
+}
+
+bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){
+
+ if ( dp_core->are_all_ports_idle() ){
+ /* if all ports are idle quit now */
+ set_quit(true);
+ }
+ return (true);
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpCanQuit::clone(){
+
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpCanQuit();
+
+ return new_msg;
+}
+
+
+/************************* messages from DP to CP **********************/
+bool
+TrexDpPortEventMsg::handle() {
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id);
+ port->get_dp_events().handle_event(m_event_type, m_thread_id, m_event_id);
+
+ return (true);
+}
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 381e146d..6bd0dbe3 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -22,9 +23,11 @@ limitations under the License.
#define __TREX_STATELESS_MESSAGING_H__
#include <msg_manager.h>
+#include <trex_dp_port_events.h>
class TrexStatelessDpCore;
class TrexStreamsCompiledObj;
+class CFlowGenListPerThread;
/**
* defines the base class for CP to DP messages
@@ -35,16 +38,40 @@ class TrexStatelessCpToDpMsgBase {
public:
TrexStatelessCpToDpMsgBase() {
+ m_quit_scheduler=false;
}
virtual ~TrexStatelessCpToDpMsgBase() {
}
+
+ virtual bool handle(TrexStatelessDpCore *dp_core) = 0;
+
/**
- * virtual function to handle a message
+ * clone the current message
*
*/
- virtual bool handle(TrexStatelessDpCore *dp_core) = 0;
+ virtual TrexStatelessCpToDpMsgBase * clone() = 0;
+
+ /* do we want to quit scheduler, can be set by handle function */
+ void set_quit(bool enable){
+ m_quit_scheduler=enable;
+ }
+
+ bool is_quit(){
+ return ( m_quit_scheduler);
+ }
+
+ /* this node is called from scheduler in case the node is free */
+ virtual void on_node_remove(){
+ }
+
+ /* no copy constructor */
+ TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete;
+
+protected:
+ int m_event_id;
+ bool m_quit_scheduler;
};
/**
@@ -55,16 +82,59 @@ public:
class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase {
public:
- TrexStatelessDpStart(TrexStreamsCompiledObj *obj);
+ TrexStatelessDpStart(uint8_t m_port_id, int m_event_id, TrexStreamsCompiledObj *obj, double duration);
~TrexStatelessDpStart();
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
virtual bool handle(TrexStatelessDpCore *dp_core);
private:
+
+ uint8_t m_port_id;
+ int m_event_id;
TrexStreamsCompiledObj *m_obj;
+ double m_duration;
+
};
+class TrexStatelessDpPause : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpPause(uint8_t port_id) : m_port_id(port_id) {
+ }
+
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+
+private:
+ uint8_t m_port_id;
+};
+
+
+class TrexStatelessDpResume : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpResume(uint8_t port_id) : m_port_id(port_id) {
+ }
+
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+
+private:
+ uint8_t m_port_id;
+};
+
+
/**
* a message to stop traffic
*
@@ -74,13 +144,156 @@ class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase {
public:
TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) {
+ m_stop_only_for_event_id=false;
+ m_event_id=0;
+ m_core = NULL;
}
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+
virtual bool handle(TrexStatelessDpCore *dp_core);
+ void set_core_ptr(CFlowGenListPerThread * core){
+ m_core = core;
+ }
+
+ CFlowGenListPerThread * get_core_ptr(){
+ return ( m_core);
+ }
+
+
+ void set_event_id(int event_id){
+ m_event_id = event_id;
+ }
+
+ void set_wait_for_event_id(bool wait){
+ m_stop_only_for_event_id = wait;
+ }
+
+ virtual void on_node_remove();
+
+
+ bool get_is_stop_by_event_id(){
+ return (m_stop_only_for_event_id);
+ }
+
+ int get_event_id(){
+ return (m_event_id);
+ }
+
private:
uint8_t m_port_id;
+ bool m_stop_only_for_event_id;
+ int m_event_id;
+ CFlowGenListPerThread * m_core ;
+
+};
+
+/**
+ * a message to Quit the datapath traffic. support only stateless for now
+ *
+ * @author hhaim
+ */
+class TrexStatelessDpQuit : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpQuit() {
+ }
+
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
};
+/**
+ * a message to check if both port are idel and exit
+ *
+ * @author hhaim
+ */
+class TrexStatelessDpCanQuit : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpCanQuit() {
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+};
+
+
+
+/************************* messages from DP to CP **********************/
+
+/**
+ * defines the base class for CP to DP messages
+ *
+ * @author imarom (27-Oct-15)
+ */
+class TrexStatelessDpToCpMsgBase {
+public:
+
+ TrexStatelessDpToCpMsgBase() {
+ }
+
+ virtual ~TrexStatelessDpToCpMsgBase() {
+ }
+
+ /**
+ * virtual function to handle a message
+ *
+ */
+ virtual bool handle() = 0;
+
+ /* no copy constructor */
+ TrexStatelessDpToCpMsgBase(TrexStatelessDpToCpMsgBase &) = delete;
+
+};
+
+
+/**
+ * a message indicating an event has happened on a port at the
+ * DP
+ *
+ */
+class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
+public:
+
+ TrexDpPortEventMsg(int thread_id, uint8_t port_id, TrexDpPortEvent::event_e type, int event_id) {
+ m_thread_id = thread_id;
+ m_port_id = port_id;
+ m_event_type = type;
+ m_event_id = event_id;
+ }
+
+ virtual bool handle();
+
+ int get_thread_id() {
+ return m_thread_id;
+ }
+
+ uint8_t get_port_id() {
+ return m_port_id;
+ }
+
+ TrexDpPortEvent::event_e get_event_type() {
+ return m_event_type;
+ }
+
+ int get_event_id() {
+ return m_event_id;
+ }
+
+private:
+ int m_thread_id;
+ uint8_t m_port_id;
+ TrexDpPortEvent::event_e m_event_type;
+ int m_event_id;
+
+};
#endif /* __TREX_STATELESS_MESSAGING_H__ */
+