summaryrefslogtreecommitdiffstats
path: root/src/stateless
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-05-03 14:57:34 +0300
committerimarom <imarom@cisco.com>2016-05-09 16:48:14 +0300
commit8691f4019dc2123c1aa7413cf3666138756c2f66 (patch)
tree4b09f137d266471b51a4e5270e8d113806c97c93 /src/stateless
parent64847bb6d182c73f7489a821ea5724687dab1bc1 (diff)
first remote PCAP push - draft
Diffstat (limited to 'src/stateless')
-rw-r--r--src/stateless/cp/trex_dp_port_events.cpp14
-rw-r--r--src/stateless/cp/trex_dp_port_events.h13
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp55
-rw-r--r--src/stateless/cp/trex_stateless_port.h6
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp120
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h12
-rw-r--r--src/stateless/dp/trex_stream_node.h129
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp20
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h38
9 files changed, 394 insertions, 13 deletions
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp
index 1321a362..fc96e00a 100644
--- a/src/stateless/cp/trex_dp_port_events.cpp
+++ b/src/stateless/cp/trex_dp_port_events.cpp
@@ -78,6 +78,9 @@ protected:
virtual void on_event() {
/* do nothing */
}
+ virtual void on_error(int thread_id) {
+ /* do nothing */
+ }
};
void
@@ -105,14 +108,14 @@ TrexDpPortEvents::barrier() {
*
*/
void
-TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id) {
+TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id, bool status) {
TrexDpPortEvent *event = lookup(event_id);
/* event might have been deleted */
if (!event) {
return;
}
- bool done = event->on_core_reporting_in(thread_id);
+ bool done = event->on_core_reporting_in(thread_id, status);
if (done) {
destroy_event(event_id);
@@ -150,7 +153,7 @@ TrexDpPortEvent::init(TrexStatelessPort *port, int event_id, int timeout_ms) {
}
bool
-TrexDpPortEvent::on_core_reporting_in(int thread_id) {
+TrexDpPortEvent::on_core_reporting_in(int thread_id, bool status) {
/* mark sure no double signal */
if (m_signal.at(thread_id)) {
std::stringstream err;
@@ -163,6 +166,11 @@ TrexDpPortEvent::on_core_reporting_in(int thread_id) {
m_signal.at(thread_id) = true;
m_pending_cnt--;
+ /* if any core reported an error - mark as a failure */
+ if (!status) {
+ on_error(thread_id);
+ }
+
/* event occured */
if (m_pending_cnt == 0) {
on_event();
diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h
index 3b8c8633..681e47ab 100644
--- a/src/stateless/cp/trex_dp_port_events.h
+++ b/src/stateless/cp/trex_dp_port_events.h
@@ -48,13 +48,22 @@ protected:
*/
virtual void on_event() = 0;
+ /**
+ * when a thread ID encounter an error
+ *
+ * @author imarom (20-Apr-16)
+ *
+ * @param thread_id
+ */
+ virtual void on_error(int thread_id) = 0;
+
TrexStatelessPort *get_port() {
return m_port;
}
private:
void init(TrexStatelessPort *port, int event_id, int timeout_ms);
- bool on_core_reporting_in(int thread_id);
+ bool on_core_reporting_in(int thread_id, bool status = true);
std::unordered_map<int, bool> m_signal;
int m_pending_cnt;
@@ -98,7 +107,7 @@ public:
/**
* a core has reached the event
*/
- void on_core_reporting_in(int event_id, int thread_id);
+ void on_core_reporting_in(int event_id, int thread_id, bool status = true);
private:
TrexDpPortEvent *lookup(int event_id);
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 90142d9b..b09393f9 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -24,6 +24,7 @@ limitations under the License.
#include <trex_stateless_messaging.h>
#include <trex_streams_compiler.h>
#include <common/basic_utils.h>
+#include <common/captureFile.h>
#include <string>
@@ -70,6 +71,20 @@ protected:
assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
}
+
+ /**
+ * when a DP core encountered an error
+ *
+ * @author imarom (20-Apr-16)
+ */
+ virtual void on_error(int thread_id) {
+ Json::Value data;
+
+ data["port_id"] = get_port()->get_port_id();
+ data["thread_id"] = thread_id;
+
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ERROR, data);
+ }
};
/***************************
@@ -395,6 +410,46 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
}
+void
+TrexStatelessPort::push_remote(const std::string &pcap_filename, double ipg_usec, double speedup, uint32_t count) {
+ /* command allowed only on state stream */
+ verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
+
+ /* check that file exists */
+ CCapReaderBase *reader;
+ std::stringstream ss;
+ reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
+ if (!reader) {
+ throw TrexException(ss.str());
+ }
+ delete reader;
+
+ /* only one core gets to play */
+ int tx_core = m_cores_id_list[0];
+
+ /* create async event */
+ assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
+ m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
+
+ /* mark all other cores as done */
+ for (int index = 1; index < m_cores_id_list.size(); index++) {
+ /* mimic an end event */
+ m_dp_events.on_core_reporting_in(m_pending_async_stop_event, m_cores_id_list[index]);
+ }
+
+ /* send a message to core */
+ change_state(PORT_STATE_TX);
+ TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(m_port_id,
+ m_pending_async_stop_event,
+ pcap_filename);
+ send_message_to_dp(tx_core, push_msg);
+
+ /* update subscribers */
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
+}
+
std::string
TrexStatelessPort::get_state_as_string() const {
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 520940d8..502c066d 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -212,6 +212,12 @@ public:
void update_traffic(const TrexPortMultiplier &mul, bool force);
/**
+ * push a PCAP file onto the port
+ *
+ */
+ void push_remote(const std::string &pcap_filename, double ipg_usec, double speedup, uint32_t count);
+
+ /**
* get the port state
*
*/
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index d3d49a34..31c907fa 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -262,6 +262,36 @@ bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
return (true);
}
+bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id, const std::string &pcap_filename){
+
+ /* push pcap can only happen on an idle port from the core prespective */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
+
+ CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
+ if (!pcap_node) {
+ return (false);
+ }
+
+ pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
+
+ uint8_t mac_addr[12];
+ m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
+
+ bool rc = pcap_node->create(port_id, pcap_filename, dir, mac_addr);
+ if (!rc) {
+ m_core->free_node((CGenNode *)pcap_node);
+ return (false);
+ }
+
+ /* schedule the node for now */
+ pcap_node->m_time = m_core->m_cur_time_sec;
+ m_core->m_node_gen.add_node((CGenNode *)pcap_node);
+
+ m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
+
+ return (true);
+}
+
bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
bool stop_on_id,
@@ -305,7 +335,6 @@ bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){
m_core=core;
m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
- m_port_id=0;
m_active_streams=0;
m_active_nodes.clear();
}
@@ -579,6 +608,7 @@ void
TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
TrexStream * stream,
TrexStreamsCompiledObj *comp) {
+
CGenNodeStateless *node = m_core->create_node_sl();
/* add periodic */
@@ -834,6 +864,37 @@ TrexStatelessDpCore::pause_traffic(uint8_t port_id){
lp_port->pause_traffic(port_id);
}
+
+void
+TrexStatelessDpCore::push_pcap(uint8_t port_id, int event_id, const std::string &pcap_filename) {
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ lp_port->set_event_id(event_id);
+
+ /* delegate the command to the port */
+ bool rc = lp_port->push_pcap(port_id, pcap_filename);
+ if (!rc) {
+ /* report back that we stopped */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
+ TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
+ port_id,
+ event_id,
+ false);
+ ring->Enqueue((CGenNode *)event_msg);
+ return;
+ }
+
+ m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
+
+ #if 0
+ if ( duration > 0.0 ){
+ add_port_duration(duration, port_id, event_id);
+ }
+ #endif
+}
+
+
void
TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
@@ -895,3 +956,60 @@ TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
event_id);
ring->Enqueue((CGenNode *)event_msg);
}
+
+
+/**
+ * PCAP node
+ */
+bool CGenNodePCAP::create(uint8_t port_id, const std::string &pcap_filename, pkt_dir_t dir, const uint8_t *mac_addr) {
+ std::stringstream ss;
+
+ m_type = CGenNode::PCAP_PKT;
+ m_flags = 0;
+ m_src_port = 0;
+ m_port_id = port_id;
+
+ /* copy MAC addr info */
+ memcpy(m_mac_addr, mac_addr, 12);
+
+ /* set the dir */
+ set_mbuf_dir(dir);
+
+ /* create the PCAP reader */
+ m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
+ if (!m_reader) {
+ return false;
+ }
+
+ m_raw_packet = new CCapPktRaw();
+ if ( m_reader->ReadPacket(m_raw_packet) == false ){
+ /* handle error */
+ delete m_reader;
+ return (false);
+ }
+
+ /* this is the reference time */
+ //m_base_time = m_raw_packet->get_time();
+ m_last_pkt_time = m_raw_packet->get_time();
+
+ /* ready */
+ m_state = PCAP_ACTIVE;
+
+ return true;
+}
+
+void CGenNodePCAP::destroy() {
+
+ if (m_raw_packet) {
+ delete m_raw_packet;
+ m_raw_packet = NULL;
+ }
+
+ if (m_reader) {
+ delete m_reader;
+ m_reader = NULL;
+ }
+
+ m_state = PCAP_INVALID;
+}
+
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index cb102b8d..01033a7c 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -70,6 +70,8 @@ public:
bool update_traffic(uint8_t port_id, double factor);
+ bool push_pcap(uint8_t port_id, const std::string &pcap_filename);
+
bool stop_traffic(uint8_t port_id,
bool stop_on_id,
int event_id);
@@ -91,7 +93,6 @@ public:
public:
state_e m_state;
- uint8_t m_port_id;
uint32_t m_active_streams; /* how many active streams on this port */
@@ -149,7 +150,7 @@ public:
*/
void start_traffic(TrexStreamsCompiledObj *obj,
double duration,
- int m_event_id);
+ int event_id);
/* pause the streams, work only if all are continues */
@@ -161,6 +162,13 @@ public:
/**
+ * push a PCAP file on port
+ *
+ */
+ void push_pcap(uint8_t port_id, int event_id, const std::string &pcap_filename);
+
+
+ /**
* update current traffic rate
*
* @author imarom (25-Nov-15)
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index c85bf8b5..8ccb5286 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -26,6 +26,8 @@ limitations under the License.
#include <stdio.h>
class TrexStatelessDpCore;
+class TrexStatelessDpPerPort;
+
#include <trex_stream.h>
class TrexStatelessCpToDpMsgBase;
@@ -387,6 +389,133 @@ private:
static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)" );
+/* this is a event for PCAP transmitting */
+struct CGenNodePCAP : public CGenNodeBase {
+friend class TrexStatelessDpPerPort;
+
+public:
+
+ /**
+ * creates a node from a PCAP file
+ */
+ bool create(uint8_t port_id, const std::string &pcap_filename, pkt_dir_t dir, const uint8_t *mac_addr);
+ void destroy();
+
+ /**
+ * advance - will read the next packet
+ *
+ * @author imarom (03-May-16)
+ */
+ void next() {
+ assert(m_state == PCAP_ACTIVE);
+
+ /* save the previous packet time */
+ m_last_pkt_time = m_raw_packet->get_time();
+
+ /* advance */
+ if ( m_reader->ReadPacket(m_raw_packet) == false ){
+ m_state = PCAP_EOF;
+ return;
+ }
+
+ }
+
+ /**
+ * return true if the PCAP has next packet
+ *
+ */
+ bool has_next() {
+ assert(m_state != PCAP_INVALID);
+ return (m_state == PCAP_ACTIVE);
+ }
+
+ /**
+ * return the time for the next scheduling for a packet
+ *
+ */
+ inline double get_ipg() {
+ assert(m_state != PCAP_INVALID);
+ return m_raw_packet->get_time() - m_last_pkt_time;
+ //return 0.00001;
+ }
+
+ /**
+ * get the current packet as MBUF
+ *
+ */
+ inline rte_mbuf_t *get_pkt() {
+ assert(m_state != PCAP_INVALID);
+
+ rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), m_raw_packet->getTotalLen());
+ assert(m);
+
+ char *p = rte_pktmbuf_append(m, m_raw_packet->getTotalLen());
+ assert(p);
+ /* copy the packet */
+ memcpy(p, m_raw_packet->raw, m_raw_packet->getTotalLen());
+
+ /* fix the MAC */
+ memcpy(p, m_mac_addr, 12);
+
+ return (m);
+ }
+
+
+ inline void handle(CFlowGenListPerThread *thread) {
+ assert(m_state != PCAP_INVALID);
+ thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+ }
+
+ void set_mbuf_dir(pkt_dir_t dir) {
+ if (dir) {
+ m_flags |=NODE_FLAGS_DIR;
+ }else{
+ m_flags &=~NODE_FLAGS_DIR;
+ }
+ }
+
+ inline pkt_dir_t get_mbuf_dir(){
+ return ((pkt_dir_t)( m_flags &1));
+ }
+
+ uint8_t get_port_id() {
+ return m_port_id;
+ }
+
+private:
+
+ enum {
+ PCAP_INVALID = 0,
+ PCAP_ACTIVE,
+ PCAP_EOF
+ };
+
+ /* cache line 0 */
+ /* important stuff here */
+ uint8_t m_mac_addr[12];
+ uint8_t m_state;
+
+ //double m_base_time;
+ //double m_current_pkt_time;
+ double m_last_pkt_time;
+
+ void * m_cache_mbuf;
+
+ double m_next_time_offset; /* in sec */
+
+ CCapReaderBase *m_reader;
+ CCapPktRaw *m_raw_packet;
+
+ uint8_t m_port_id;
+
+ /* pad to match the size of CGenNode */
+ uint8_t m_pad_end[25];
+
+} __rte_cache_aligned;
+
+
+static_assert(sizeof(CGenNodePCAP) == sizeof(CGenNode), "sizeof(CGenNodePCAP) != sizeof(CGenNode)" );
#endif /* __TREX_STREAM_NODE_H__ */
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 7edf0f13..59e0a0a8 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -181,6 +181,24 @@ TrexStatelessDpUpdate::clone() {
return new_msg;
}
+
+/*************************
+ push PCAP message
+ ************************/
+bool
+TrexStatelessDpPushPCAP::handle(TrexStatelessDpCore *dp_core) {
+ dp_core->push_pcap(m_port_id, m_event_id, m_pcap_filename);
+ return true;
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpPushPCAP::clone() {
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpPushPCAP(m_port_id, m_event_id, m_pcap_filename);
+
+ return new_msg;
+}
+
+
/*************************
barrier message
************************/
@@ -203,7 +221,7 @@ TrexStatelessDpBarrier::clone() {
bool
TrexDpPortEventMsg::handle() {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id);
- port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id);
+ port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id, get_status());
return (true);
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 0eed01bd..8fb2a456 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -246,6 +246,29 @@ private:
double m_factor;
};
+
+/**
+ * psuh a PCAP message
+ */
+class TrexStatelessDpPushPCAP : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpPushPCAP(uint8_t port_id, int event_id, const std::string &pcap_filename) : m_pcap_filename(pcap_filename) {
+ m_port_id = port_id;
+ m_event_id = event_id;
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+private:
+ std::string m_pcap_filename;
+ uint8_t m_port_id;
+ int m_event_id;
+};
+
+
/**
* barrier message for DP core
*
@@ -267,6 +290,7 @@ private:
int m_event_id;
};
+
/************************* messages from DP to CP **********************/
/**
@@ -303,10 +327,11 @@ public:
class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
public:
- TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id) {
- m_thread_id = thread_id;
- m_port_id = port_id;
- m_event_id = event_id;
+ TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id, bool status = true) {
+ m_thread_id = thread_id;
+ m_port_id = port_id;
+ m_event_id = event_id;
+ m_status = status;
}
virtual bool handle();
@@ -323,10 +348,15 @@ public:
return m_event_id;
}
+ bool get_status() {
+ return m_status;
+ }
+
private:
int m_thread_id;
uint8_t m_port_id;
int m_event_id;
+ bool m_status;
};