summaryrefslogtreecommitdiffstats
path: root/src/stateless
diff options
context:
space:
mode:
authorHanoh Haim <hhaim@cisco.com>2016-05-10 17:49:25 +0300
committerHanoh Haim <hhaim@cisco.com>2016-05-10 17:49:25 +0300
commit2d37b9f98020a4458aaad1f3fd05ca5e408213e0 (patch)
tree3a8cd16eb748711b72df37c6f7eea4842d73290a /src/stateless
parent996f2451dba01f534420418eaac2856510682757 (diff)
parent63bf6aba10075a03fe6609369c1c7008afb85ba7 (diff)
merge from master
Diffstat (limited to 'src/stateless')
-rw-r--r--src/stateless/cp/trex_dp_port_events.cpp14
-rw-r--r--src/stateless/cp/trex_dp_port_events.h13
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp78
-rw-r--r--src/stateless/cp/trex_stateless_port.h27
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp192
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h29
-rw-r--r--src/stateless/dp/trex_stream_node.h172
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp32
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h52
9 files changed, 575 insertions, 34 deletions
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp
index 1321a362..fc96e00a 100644
--- a/src/stateless/cp/trex_dp_port_events.cpp
+++ b/src/stateless/cp/trex_dp_port_events.cpp
@@ -78,6 +78,9 @@ protected:
virtual void on_event() {
/* do nothing */
}
+ virtual void on_error(int thread_id) {
+ /* do nothing */
+ }
};
void
@@ -105,14 +108,14 @@ TrexDpPortEvents::barrier() {
*
*/
void
-TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id) {
+TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id, bool status) {
TrexDpPortEvent *event = lookup(event_id);
/* event might have been deleted */
if (!event) {
return;
}
- bool done = event->on_core_reporting_in(thread_id);
+ bool done = event->on_core_reporting_in(thread_id, status);
if (done) {
destroy_event(event_id);
@@ -150,7 +153,7 @@ TrexDpPortEvent::init(TrexStatelessPort *port, int event_id, int timeout_ms) {
}
bool
-TrexDpPortEvent::on_core_reporting_in(int thread_id) {
+TrexDpPortEvent::on_core_reporting_in(int thread_id, bool status) {
/* mark sure no double signal */
if (m_signal.at(thread_id)) {
std::stringstream err;
@@ -163,6 +166,11 @@ TrexDpPortEvent::on_core_reporting_in(int thread_id) {
m_signal.at(thread_id) = true;
m_pending_cnt--;
+ /* if any core reported an error - mark as a failure */
+ if (!status) {
+ on_error(thread_id);
+ }
+
/* event occured */
if (m_pending_cnt == 0) {
on_event();
diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h
index 3b8c8633..681e47ab 100644
--- a/src/stateless/cp/trex_dp_port_events.h
+++ b/src/stateless/cp/trex_dp_port_events.h
@@ -48,13 +48,22 @@ protected:
*/
virtual void on_event() = 0;
+ /**
+ * when a thread ID encounter an error
+ *
+ * @author imarom (20-Apr-16)
+ *
+ * @param thread_id
+ */
+ virtual void on_error(int thread_id) = 0;
+
TrexStatelessPort *get_port() {
return m_port;
}
private:
void init(TrexStatelessPort *port, int event_id, int timeout_ms);
- bool on_core_reporting_in(int thread_id);
+ bool on_core_reporting_in(int thread_id, bool status = true);
std::unordered_map<int, bool> m_signal;
int m_pending_cnt;
@@ -98,7 +107,7 @@ public:
/**
* a core has reached the event
*/
- void on_core_reporting_in(int event_id, int thread_id);
+ void on_core_reporting_in(int event_id, int thread_id, bool status = true);
private:
TrexDpPortEvent *lookup(int event_id);
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 90142d9b..360cc7d6 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -24,6 +24,7 @@ limitations under the License.
#include <trex_stateless_messaging.h>
#include <trex_streams_compiler.h>
#include <common/basic_utils.h>
+#include <common/captureFile.h>
#include <string>
@@ -70,6 +71,20 @@ protected:
assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
}
+
+ /**
+ * when a DP core encountered an error
+ *
+ * @author imarom (20-Apr-16)
+ */
+ virtual void on_error(int thread_id) {
+ Json::Value data;
+
+ data["port_id"] = get_port()->get_port_id();
+ data["thread_id"] = thread_id;
+
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ERROR, data);
+ }
};
/***************************
@@ -240,6 +255,13 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
}
+bool TrexStatelessPort::is_active() const {
+ return ( (m_port_state == PORT_STATE_TX)
+ || (m_port_state == PORT_STATE_PAUSE)
+ || (m_port_state == PORT_STATE_PCAP_TX)
+ );
+}
+
/**
* stop traffic on port
*
@@ -249,9 +271,7 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
*/
void
TrexStatelessPort::stop_traffic(void) {
-
- if (!( (m_port_state == PORT_STATE_TX)
- || (m_port_state == PORT_STATE_PAUSE) )) {
+ if (!is_active()) {
return;
}
@@ -395,6 +415,55 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
}
+void
+TrexStatelessPort::push_remote(const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration) {
+
+ /* command allowed only on state stream */
+ verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
+
+ /* check that file exists */
+ CCapReaderBase *reader;
+ std::stringstream ss;
+ reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
+ if (!reader) {
+ throw TrexException(ss.str());
+ }
+ delete reader;
+
+ /* only one core gets to play */
+ int tx_core = m_cores_id_list[0];
+
+ /* create async event */
+ assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
+ m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
+
+ /* mark all other cores as done */
+ for (int index = 1; index < m_cores_id_list.size(); index++) {
+ /* mimic an end event */
+ m_dp_events.on_core_reporting_in(m_pending_async_stop_event, m_cores_id_list[index]);
+ }
+
+ /* send a message to core */
+ change_state(PORT_STATE_PCAP_TX);
+ TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(m_port_id,
+ m_pending_async_stop_event,
+ pcap_filename,
+ ipg_usec,
+ speedup,
+ count,
+ duration);
+ send_message_to_dp(tx_core, push_msg);
+
+ /* update subscribers */
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
+}
+
std::string
TrexStatelessPort::get_state_as_string() const {
@@ -413,6 +482,9 @@ TrexStatelessPort::get_state_as_string() const {
case PORT_STATE_PAUSE:
return "PAUSE";
+
+ case PORT_STATE_PCAP_TX:
+ return "PCAP_TX";
}
return "UNKNOWN";
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 520940d8..8856b429 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -129,11 +129,12 @@ public:
* port state
*/
enum port_state_e {
- PORT_STATE_DOWN = 0x1,
- PORT_STATE_IDLE = 0x2,
- PORT_STATE_STREAMS = 0x4,
- PORT_STATE_TX = 0x8,
- PORT_STATE_PAUSE = 0x10,
+ PORT_STATE_DOWN = 0x1,
+ PORT_STATE_IDLE = 0x2,
+ PORT_STATE_STREAMS = 0x4,
+ PORT_STATE_TX = 0x8,
+ PORT_STATE_PAUSE = 0x10,
+ PORT_STATE_PCAP_TX = 0x20,
};
/**
@@ -212,6 +213,16 @@ public:
void update_traffic(const TrexPortMultiplier &mul, bool force);
/**
+ * push a PCAP file onto the port
+ *
+ */
+ void push_remote(const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration);
+
+ /**
* get the port state
*
*/
@@ -220,6 +231,12 @@ public:
}
/**
+ * return true if the port is active
+ * (paused is considered active)
+ */
+ bool is_active() const;
+
+ /**
* port state as string
*
*/
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 13bf5a5d..c5963625 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -358,6 +358,51 @@ bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
return (true);
}
+bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count) {
+
+ /* push pcap can only happen on an idle port from the core prespective */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
+
+ CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
+ if (!pcap_node) {
+ return (false);
+ }
+
+ pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
+ socket_id_t socket_id = m_core->m_node_gen.m_socket_id;
+
+ uint8_t mac_addr[12];
+ m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
+
+ bool rc = pcap_node->create(port_id,
+ dir,
+ socket_id,
+ mac_addr,
+ pcap_filename,
+ ipg_usec,
+ speedup,
+ count);
+ if (!rc) {
+ m_core->free_node((CGenNode *)pcap_node);
+ return (false);
+ }
+
+ /* schedule the node for now */
+ pcap_node->m_time = m_core->m_cur_time_sec;
+ m_core->m_node_gen.add_node((CGenNode *)pcap_node);
+
+ /* hold a pointer to the node */
+ assert(m_active_pcap_node == NULL);
+ m_active_pcap_node = pcap_node;
+
+ m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
+ return (true);
+}
+
bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
bool stop_on_id,
@@ -390,6 +435,19 @@ bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
}
}
+ /* check for active PCAP node */
+ if (m_active_pcap_node) {
+ /* when got async stop from outside or duration */
+ if (m_active_pcap_node->is_active()) {
+ m_active_pcap_node->mark_for_free();
+ } else {
+ /* graceful stop - node was put out by the scheduler */
+ m_core->free_node( (CGenNode *)m_active_pcap_node);
+ }
+
+ m_active_pcap_node = NULL;
+ }
+
/* active stream should be zero */
assert(m_active_streams==0);
m_active_nodes.clear();
@@ -401,9 +459,9 @@ bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){
m_core=core;
m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
- m_port_id=0;
m_active_streams=0;
m_active_nodes.clear();
+ m_active_pcap_node = NULL;
}
@@ -709,6 +767,7 @@ void
TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
TrexStream * stream,
TrexStreamsCompiledObj *comp) {
+
CGenNodeStateless *node = m_core->create_node_sl();
node->cache_mbuf_array_init();
@@ -977,6 +1036,42 @@ TrexStatelessDpCore::pause_traffic(uint8_t port_id){
lp_port->pause_traffic(port_id);
}
+
+void
+TrexStatelessDpCore::push_pcap(uint8_t port_id,
+ int event_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration) {
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ lp_port->set_event_id(event_id);
+
+ /* delegate the command to the port */
+ bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
+ if (!rc) {
+ /* report back that we stopped */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
+ TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
+ port_id,
+ event_id,
+ false);
+ ring->Enqueue((CGenNode *)event_msg);
+ return;
+ }
+
+
+ if (duration > 0.0) {
+ add_port_duration(duration, port_id, event_id);
+ }
+
+ m_state = TrexStatelessDpCore::STATE_PCAP_TX;
+}
+
+
void
TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
@@ -995,21 +1090,10 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id,
the scheduler invokes it, it will be free */
TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
-
if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
- /* nothing to do ! already stopped */
- //printf(" skip .. %f\n",m_core->m_cur_time_sec);
return;
}
- /* inform the control plane we stopped - this might be a async stop
- (streams ended)
- */
- #if 0
- if ( are_all_ports_idle() ) {
- /* just a place holder if we will need to do somthing in that case */
- }
- #endif
CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
@@ -1038,3 +1122,87 @@ TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
event_id);
ring->Enqueue((CGenNode *)event_msg);
}
+
+
+/**
+ * PCAP node
+ */
+bool CGenNodePCAP::create(uint8_t port_id,
+ pkt_dir_t dir,
+ socket_id_t socket_id,
+ const uint8_t *mac_addr,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count) {
+ std::stringstream ss;
+
+ m_type = CGenNode::PCAP_PKT;
+ m_flags = 0;
+ m_src_port = 0;
+ m_port_id = port_id;
+ m_count = count;
+
+ /* mark this node as slow path */
+ set_slow_path(true);
+
+ if (ipg_usec != -1) {
+ /* fixed IPG */
+ m_ipg_sec = usec_to_sec(ipg_usec / speedup);
+ m_speedup = 0;
+ } else {
+ /* packet IPG */
+ m_ipg_sec = -1;
+ m_speedup = speedup;
+ }
+
+ /* copy MAC addr info */
+ memcpy(m_mac_addr, mac_addr, 12);
+
+ /* set the dir */
+ set_mbuf_dir(dir);
+ set_socket_id(socket_id);
+
+ /* create the PCAP reader */
+ m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
+ if (!m_reader) {
+ return false;
+ }
+
+ m_raw_packet = new CCapPktRaw();
+ if ( m_reader->ReadPacket(m_raw_packet) == false ){
+ /* handle error */
+ delete m_reader;
+ return (false);
+ }
+
+ /* this is the reference time */
+ //m_base_time = m_raw_packet->get_time();
+ m_last_pkt_time = m_raw_packet->get_time();
+
+ /* ready */
+ m_state = PCAP_ACTIVE;
+
+ return true;
+}
+
+/**
+ * cleanup for PCAP node
+ *
+ * @author imarom (08-May-16)
+ */
+void CGenNodePCAP::destroy() {
+
+ if (m_raw_packet) {
+ delete m_raw_packet;
+ m_raw_packet = NULL;
+ }
+
+ if (m_reader) {
+ delete m_reader;
+ m_reader = NULL;
+ }
+
+ m_state = PCAP_INVALID;
+}
+
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index bdf84cfd..af2187ae 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -33,7 +33,7 @@ class CFlowGenListPerThread;
class CGenNodeStateless;
class TrexStreamsCompiledObj;
class TrexStream;
-
+class CGenNodePCAP;
class CDpOneStream {
public:
@@ -54,7 +54,8 @@ public:
enum state_e {
ppSTATE_IDLE,
ppSTATE_TRANSMITTING,
- ppSTATE_PAUSE
+ ppSTATE_PAUSE,
+ ppSTATE_PCAP_TX,
};
@@ -70,6 +71,12 @@ public:
bool update_traffic(uint8_t port_id, double factor);
+ bool push_pcap(uint8_t port_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count);
+
bool stop_traffic(uint8_t port_id,
bool stop_on_id,
int event_id);
@@ -91,11 +98,11 @@ public:
public:
state_e m_state;
- uint8_t m_port_id;
uint32_t m_active_streams; /* how many active streams on this port */
std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */
+ CGenNodePCAP *m_active_pcap_node;
CFlowGenListPerThread * m_core ;
int m_event_id;
};
@@ -113,6 +120,7 @@ public:
enum state_e {
STATE_IDLE,
STATE_TRANSMITTING,
+ STATE_PCAP_TX,
STATE_TERMINATE
};
@@ -151,7 +159,7 @@ public:
*/
void start_traffic(TrexStreamsCompiledObj *obj,
double duration,
- int m_event_id);
+ int event_id);
/* pause the streams, work only if all are continues */
@@ -163,6 +171,19 @@ public:
/**
+ * push a PCAP file on port
+ *
+ */
+ void push_pcap(uint8_t port_id,
+ int event_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration);
+
+
+ /**
* update current traffic rate
*
* @author imarom (25-Nov-15)
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index e9e5cf5b..b5395e78 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -26,6 +26,8 @@ limitations under the License.
#include <stdio.h>
class TrexStatelessDpCore;
+class TrexStatelessDpPerPort;
+
#include <trex_stream.h>
class TrexStatelessCpToDpMsgBase;
@@ -448,6 +450,176 @@ private:
static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)" );
+/* this is a event for PCAP transmitting */
+struct CGenNodePCAP : public CGenNodeBase {
+friend class TrexStatelessDpPerPort;
+
+public:
+
+ /**
+ * creates a node from a PCAP file
+ */
+ bool create(uint8_t port_id,
+ pkt_dir_t dir,
+ socket_id_t socket_id,
+ const uint8_t *mac_addr,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count);
+ /**
+ * destroy the node cleaning up any data
+ *
+ */
+ void destroy();
+
+ /**
+ * advance - will read the next packet
+ *
+ * @author imarom (03-May-16)
+ */
+ void next() {
+ assert(is_active());
+
+ /* save the previous packet time */
+ m_last_pkt_time = m_raw_packet->get_time();
+
+ /* advance */
+ if ( m_reader->ReadPacket(m_raw_packet) == false ){
+ m_count--;
+
+ /* if its the end - go home... */
+ if (m_count == 0) {
+ m_state = PCAP_INACTIVE;
+ return;
+ }
+
+ /* rewind and load the first packet */
+ m_reader->Rewind();
+ if (!m_reader->ReadPacket(m_raw_packet)) {
+ m_state = PCAP_INACTIVE;
+ return;
+ }
+ }
+
+ }
+
+ /**
+ * return the time for the next scheduling for a packet
+ *
+ */
+ inline double get_ipg() {
+ assert(m_state != PCAP_INVALID);
+
+ /* fixed IPG */
+ if (m_ipg_sec != -1) {
+ return m_ipg_sec;
+ } else {
+ return ((m_raw_packet->get_time() - m_last_pkt_time) / m_speedup);
+ }
+ }
+
+ /**
+ * get the current packet as MBUF
+ *
+ */
+ inline rte_mbuf_t *get_pkt() {
+ assert(m_state != PCAP_INVALID);
+
+ rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), m_raw_packet->getTotalLen());
+ assert(m);
+
+ char *p = rte_pktmbuf_append(m, m_raw_packet->getTotalLen());
+ assert(p);
+
+ /* copy the packet */
+ memcpy(p, m_raw_packet->raw, m_raw_packet->getTotalLen());
+
+ /* fix the MAC */
+ memcpy(p, m_mac_addr, 12);
+
+ return (m);
+ }
+
+
+ inline void handle(CFlowGenListPerThread *thread) {
+ assert(m_state != PCAP_INVALID);
+ thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+
+ // read the next packet
+ next();
+
+ if (is_active()) {
+ m_time += get_ipg();
+ thread->m_node_gen.m_p_queue.push((CGenNode *)this);
+
+ } else {
+ thread->stop_stateless_traffic(get_port_id());
+ }
+ }
+
+ void set_mbuf_dir(pkt_dir_t dir) {
+ if (dir) {
+ m_flags |=NODE_FLAGS_DIR;
+ }else{
+ m_flags &=~NODE_FLAGS_DIR;
+ }
+ }
+
+ inline pkt_dir_t get_mbuf_dir(){
+ return ((pkt_dir_t)( m_flags &1));
+ }
+
+ uint8_t get_port_id() {
+ return m_port_id;
+ }
+
+ void mark_for_free() {
+ m_state = PCAP_MARKED_FOR_FREE;
+ }
+
+ bool is_active() {
+ return (m_state == PCAP_ACTIVE);
+ }
+
+ bool is_marked_for_free() {
+ return (m_state == PCAP_MARKED_FOR_FREE);
+ }
+
+private:
+
+ enum {
+ PCAP_INVALID = 0,
+ PCAP_ACTIVE,
+ PCAP_INACTIVE,
+ PCAP_MARKED_FOR_FREE
+ };
+
+ /* cache line 0 */
+ /* important stuff here */
+ uint8_t m_mac_addr[12];
+ uint8_t m_state;
+
+ double m_last_pkt_time;
+ double m_speedup;
+ double m_ipg_sec;
+ uint32_t m_count;
+
+ double m_next_time_offset; /* in sec */
+
+ CCapReaderBase *m_reader;
+ CCapPktRaw *m_raw_packet;
+
+ uint8_t m_port_id;
+
+ /* pad to match the size of CGenNode */
+ uint8_t m_pad_end[33];
+
+} __rte_cache_aligned;
+
+
+static_assert(sizeof(CGenNodePCAP) == sizeof(CGenNode), "sizeof(CGenNodePCAP) != sizeof(CGenNode)" );
#endif /* __TREX_STREAM_NODE_H__ */
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 7edf0f13..1cbacb6f 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -181,6 +181,36 @@ TrexStatelessDpUpdate::clone() {
return new_msg;
}
+
+/*************************
+ push PCAP message
+ ************************/
+bool
+TrexStatelessDpPushPCAP::handle(TrexStatelessDpCore *dp_core) {
+ dp_core->push_pcap(m_port_id,
+ m_event_id,
+ m_pcap_filename,
+ m_ipg_usec,
+ m_speedup,
+ m_count,
+ m_duration);
+ return true;
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpPushPCAP::clone() {
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpPushPCAP(m_port_id,
+ m_event_id,
+ m_pcap_filename,
+ m_ipg_usec,
+ m_speedup,
+ m_count,
+ m_duration);
+
+ return new_msg;
+}
+
+
/*************************
barrier message
************************/
@@ -203,7 +233,7 @@ TrexStatelessDpBarrier::clone() {
bool
TrexDpPortEventMsg::handle() {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id);
- port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id);
+ port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id, get_status());
return (true);
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 0eed01bd..9b1f2e31 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -246,6 +246,43 @@ private:
double m_factor;
};
+
+/**
+ * psuh a PCAP message
+ */
+class TrexStatelessDpPushPCAP : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpPushPCAP(uint8_t port_id,
+ int event_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration) : m_pcap_filename(pcap_filename) {
+ m_port_id = port_id;
+ m_event_id = event_id;
+ m_ipg_usec = ipg_usec;
+ m_speedup = speedup;
+ m_count = count;
+ m_duration = duration;
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+private:
+ std::string m_pcap_filename;
+ int m_event_id;
+ double m_ipg_usec;
+ double m_speedup;
+ double m_duration;
+ uint32_t m_count;
+ uint8_t m_port_id;
+};
+
+
/**
* barrier message for DP core
*
@@ -267,6 +304,7 @@ private:
int m_event_id;
};
+
/************************* messages from DP to CP **********************/
/**
@@ -303,10 +341,11 @@ public:
class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
public:
- TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id) {
- m_thread_id = thread_id;
- m_port_id = port_id;
- m_event_id = event_id;
+ TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id, bool status = true) {
+ m_thread_id = thread_id;
+ m_port_id = port_id;
+ m_event_id = event_id;
+ m_status = status;
}
virtual bool handle();
@@ -323,10 +362,15 @@ public:
return m_event_id;
}
+ bool get_status() {
+ return m_status;
+ }
+
private:
int m_thread_id;
uint8_t m_port_id;
int m_event_id;
+ bool m_status;
};