summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-05-03 14:57:34 +0300
committerimarom <imarom@cisco.com>2016-05-09 16:48:14 +0300
commit8691f4019dc2123c1aa7413cf3666138756c2f66 (patch)
tree4b09f137d266471b51a4e5270e8d113806c97c93 /src
parent64847bb6d182c73f7489a821ea5724687dab1bc1 (diff)
first remote PCAP push - draft
Diffstat (limited to 'src')
-rwxr-xr-xsrc/bp_sim.cpp142
-rwxr-xr-xsrc/bp_sim.h11
-rwxr-xr-xsrc/common/captureFile.cpp16
-rwxr-xr-xsrc/common/captureFile.h6
-rw-r--r--src/main_dpdk.cpp24
-rw-r--r--src/publisher/trex_publisher.h1
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp28
-rw-r--r--src/rpc-server/commands/trex_rpc_cmds.h2
-rw-r--r--src/rpc-server/trex_rpc_cmds_table.cpp1
-rw-r--r--src/stateless/cp/trex_dp_port_events.cpp14
-rw-r--r--src/stateless/cp/trex_dp_port_events.h13
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp55
-rw-r--r--src/stateless/cp/trex_stateless_port.h6
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp120
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h12
-rw-r--r--src/stateless/dp/trex_stream_node.h129
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp20
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h38
18 files changed, 549 insertions, 89 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
index 94f8a2ba..2491d122 100755
--- a/src/bp_sim.cpp
+++ b/src/bp_sim.cpp
@@ -3645,78 +3645,90 @@ int CNodeGenerator::flush_file(dsec_t max_time,
uint8_t type=node->m_type;
if ( type == CGenNode::STATELESS_PKT ) {
- m_p_queue.pop();
- CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
+ m_p_queue.pop();
+ CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
- /* if the stream has been deactivated - end */
- if ( unlikely( node_sl->is_mask_for_free() ) ) {
- thread->free_node(node);
- } else {
+ /* if the stream has been deactivated - end */
+ if ( unlikely( node_sl->is_mask_for_free() ) ) {
+ thread->free_node(node);
+ } else {
- /* count before handle - node might be destroyed */
- #ifdef TREX_SIM
- update_stl_stats(node_sl);
- #endif
+ /* count before handle - node might be destroyed */
+ #ifdef TREX_SIM
+ update_stl_stats(node_sl);
+ #endif
- node_sl->handle(thread);
+ node_sl->handle(thread);
- #ifdef TREX_SIM
- if (has_limit_reached()) {
- thread->m_stateless_dp_info.stop_traffic(node_sl->get_port_id(), false, 0);
- }
- #endif
+ #ifdef TREX_SIM
+ if (has_limit_reached()) {
+ thread->m_stateless_dp_info.stop_traffic(node_sl->get_port_id(), false, 0);
+ }
+ #endif
- }
+ }
- }else{
- if ( likely( type == CGenNode::FLOW_PKT ) ) {
- /* PKT */
- if ( !(node->is_repeat_flow()) || (always==false)) {
- flush_one_node_to_file(node);
- #ifdef _DEBUG
- update_stats(node);
- #endif
+ } else if ( likely( type == CGenNode::FLOW_PKT ) ) {
+ /* PKT */
+ if ( !(node->is_repeat_flow()) || (always==false)) {
+ flush_one_node_to_file(node);
+ #ifdef _DEBUG
+ update_stats(node);
+ #endif
+ }
+ m_p_queue.pop();
+ if ( node->is_last_in_flow() ) {
+ if ((node->is_repeat_flow()) && (always==false)) {
+ /* Flow is repeated, reschedule it */
+ thread->reschedule_flow( node);
+ } else {
+ /* Flow will not be repeated, so free node */
+ thread->free_last_flow_node( node);
}
- m_p_queue.pop();
- if ( node->is_last_in_flow() ) {
- if ((node->is_repeat_flow()) && (always==false)) {
- /* Flow is repeated, reschedule it */
- thread->reschedule_flow( node);
- }else{
- /* Flow will not be repeated, so free node */
- thread->free_last_flow_node( node);
- }
- }else{
- node->update_next_pkt_in_flow();
+ } else {
+ node->update_next_pkt_in_flow();
+ m_p_queue.push(node);
+ }
+ } else if ((type == CGenNode::FLOW_FIF)) {
+ /* callback to our method */
+ m_p_queue.pop();
+ if ( always == false) {
+ thread->m_cur_time_sec = node->m_time ;
+
+ if ( thread->generate_flows_roundrobin(&done) <0){
+ break;
+ }
+ if (!done) {
+ node->m_time +=d_time;
m_p_queue.push(node);
+ } else {
+ thread->free_node(node);
}
- }else{
- if ((type == CGenNode::FLOW_FIF)) {
- /* callback to our method */
- m_p_queue.pop();
- if ( always == false) {
- thread->m_cur_time_sec = node->m_time ;
-
- if ( thread->generate_flows_roundrobin(&done) <0){
- break;
- }
- if (!done) {
- node->m_time +=d_time;
- m_p_queue.push(node);
- }else{
- thread->free_node(node);
- }
- }else{
- thread->free_node(node);
- }
+ } else {
+ thread->free_node(node);
+ }
+
+ } else if (type == CGenNode::PCAP_PKT) {
+ m_p_queue.pop();
+
+ CGenNodePCAP *node_pcap = (CGenNodePCAP *)node;
+ node_pcap->handle(thread);
+
+ if (node_pcap->has_next()) {
+ node_pcap->next();
+ node_pcap->m_time += node_pcap->get_ipg();
+ m_p_queue.push(node);
+ } else {
+ thread->free_node(node);
+ thread->m_stateless_dp_info.stop_traffic(node_pcap->get_port_id(), false, 0);
- }else{
- bool exit_sccheduler = handle_slow_messages(type,node,thread,always);
- if (exit_sccheduler) {
- break;
- }
- }
+ }
+
+ } else {
+ bool exit_sccheduler = handle_slow_messages(type,node,thread,always);
+ if (exit_sccheduler) {
+ break;
}
}
}
@@ -6212,10 +6224,18 @@ void CGenNodeBase::free_base(){
CGenNodeStateless* p=(CGenNodeStateless*)this;
p->free_stl_node();
return;
+ }
+
+ if (m_type == PCAP_PKT) {
+ CGenNodePCAP *p = (CGenNodePCAP *)this;
+ p->destroy();
+ return;
}
+
if ( m_type == COMMAND ) {
CGenNodeCommand* p=(CGenNodeCommand*)this;
p->free_command();
}
}
+
diff --git a/src/bp_sim.h b/src/bp_sim.h
index 1ec036c0..7399137b 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -61,6 +61,8 @@ limitations under the License.
#include <trex_stateless_dp_core.h>
+class CGenNodePCAP;
+
#undef NAT_TRACE_
#define FORCE_NO_INLINE __attribute__ ((noinline))
@@ -1419,7 +1421,9 @@ public:
EXIT_SCHED =6,
COMMAND =7,
- EXIT_PORT_SCHED =8
+ EXIT_PORT_SCHED =8,
+
+ PCAP_PKT =9,
};
@@ -3558,10 +3562,14 @@ public :
inline CGenNode * create_node(void);
+
inline CGenNodeStateless * create_node_sl(void){
return ((CGenNodeStateless*)create_node() );
}
+ inline CGenNodePCAP * allocate_pcap_node(void) {
+ return ((CGenNodePCAP*)create_node());
+ }
inline void free_node(CGenNode *p);
inline void free_last_flow_node(CGenNode *p);
@@ -3583,7 +3591,6 @@ public:
bool set_stateless_next_node( CGenNodeStateless * cur_node,
CGenNodeStateless * next_node);
-
void Dump(FILE *fd);
void DumpCsv(FILE *fd);
void DumpStats(FILE *fd);
diff --git a/src/common/captureFile.cpp b/src/common/captureFile.cpp
index e73c37ad..4c50bcb2 100755
--- a/src/common/captureFile.cpp
+++ b/src/common/captureFile.cpp
@@ -244,28 +244,23 @@ bool CErfCmp::compare(std::string f1, std::string f2 ){
return (res);
}
-
-
/**
* try to create type by type
* @param name
*
* @return CCapReaderBase*
*/
-CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops)
+CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops, std::ostream &err)
{
- if (name == NULL) {
- printf("Got null file name\n");
- return NULL;
- }
+ assert(name);
/* make sure we have a file */
FILE * f = CAP_FOPEN_64(name, "rb");
if (f == NULL) {
if (errno == ENOENT) {
- printf("\nERROR: Cap file not found %s\n\n",name);
+ err << "CAP file '" << name << "' not found";
} else {
- printf("\nERROR: Failed to open cap file '%s' with errno %d\n\n", name, errno);
+ err << "failed to open CAP file '" << name << "' with errno " << errno;
}
return NULL;
}
@@ -281,8 +276,7 @@ CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops)
delete next;
}
- printf("\nERROR: file %s format not supported",name);
- printf("\nERROR: formats supported are LIBPCAP and ERF. other formats are deprecated\n\n");
+ err << "unsupported CAP format (not PCAP or ERF): " << name << "\n";
return NULL;
}
diff --git a/src/common/captureFile.h b/src/common/captureFile.h
index 3be83432..32b98272 100755
--- a/src/common/captureFile.h
+++ b/src/common/captureFile.h
@@ -24,6 +24,8 @@ limitations under the License.
#include <math.h>
#include <stdlib.h>
#include <string>
+#include <iostream>
+
#ifdef WIN32
#pragma warning(disable:4786)
#endif
@@ -201,11 +203,13 @@ public:
* @param name - cature file name
* @param loops - number of loops for the same capture. use 0
* for one time transmition
+ * @param err - IO stream to print error
+ *
* @return CCapReaderBase* - pointer to new instance (allocated
* by the function). the user should release the
* instance once it has no use any more.
*/
- static CCapReaderBase * CreateReader(char * name, int loops = 0);
+ static CCapReaderBase * CreateReader(char * name, int loops = 0, std::ostream &err = std::cout);
private:
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 6dec3dec..3c345aa5 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -1773,6 +1773,8 @@ protected:
class CCoreEthIFStateless : public CCoreEthIF {
public:
virtual int send_node(CGenNode * node);
+protected:
+ int send_pcap_node(CGenNodePCAP *pcap_node);
};
bool CCoreEthIF::Create(uint8_t core_id,
@@ -1998,7 +2000,13 @@ void CCoreEthIF::update_mac_addr(CGenNode * node,uint8_t *p){
-int CCoreEthIFStateless::send_node(CGenNode * no){
+int CCoreEthIFStateless::send_node(CGenNode * no) {
+
+ /* slow path - PCAP nodes */
+ if (no->m_type == CGenNode::PCAP_PKT) {
+ return send_pcap_node((CGenNodePCAP *)no);
+ }
+
CGenNodeStateless * node_sl=(CGenNodeStateless *) no;
/* check that we have mbuf */
rte_mbuf_t * m=node_sl->get_cache_mbuf();
@@ -2027,6 +2035,20 @@ int CCoreEthIFStateless::send_node(CGenNode * no){
return (0);
};
+int CCoreEthIFStateless::send_pcap_node(CGenNodePCAP *pcap_node) {
+ rte_mbuf_t *m = pcap_node->get_pkt();
+ if (!m) {
+ return (-1);
+ }
+
+ pkt_dir_t dir = (pkt_dir_t)pcap_node->get_mbuf_dir();
+ CCorePerPort *lp_port=&m_ports[dir];
+ CVirtualIFPerSideStats *lp_stats = &m_stats[dir];
+
+ send_pkt(lp_port, m, lp_stats);
+
+ return (0);
+}
int CCoreEthIF::send_node(CGenNode * node){
diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h
index f8843758..1d283478 100644
--- a/src/publisher/trex_publisher.h
+++ b/src/publisher/trex_publisher.h
@@ -48,6 +48,7 @@ public:
EVENT_PORT_FINISHED_TX = 4,
EVENT_PORT_ACQUIRED = 5,
EVENT_PORT_RELEASED = 6,
+ EVENT_PORT_ERROR = 7,
EVENT_SERVER_STOPPED = 100,
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index f7a23188..d48c770e 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -451,3 +451,31 @@ TrexRpcPublishNow::_run(const Json::Value &params, Json::Value &result) {
return (TREX_RPC_CMD_OK);
}
+
+
+/**
+ * push a remote PCAP on a port
+ *
+ */
+trex_rpc_cmd_rc_e
+TrexRpcCmdPushRemote::_run(const Json::Value &params, Json::Value &result) {
+
+ uint8_t port_id = parse_port(params, result);
+ std::string pcap_filename = parse_string(params, "pcap_filename", result);
+ double ipg = parse_double(params, "ipg", result);
+ double speedup = parse_double(params, "speedup", result);
+ uint32_t count = parse_uint32(params, "count", result);
+
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ try {
+ port->push_remote(pcap_filename, ipg, speedup, count);
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ result["result"] = Json::objectValue;
+ return (TREX_RPC_CMD_OK);
+
+}
+
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index 428bdd7b..99c83545 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -130,5 +130,7 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdUpdateTraffic, "update_traffic", 3, true, APIClass
TREX_RPC_CMD_DEFINE(TrexRpcCmdValidate, "validate", 2, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdPushRemote, "push_remote", 5, true, APIClass::API_CLASS_TYPE_CORE);
+
#endif /* __TREX_RPC_CMD_H__ */
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index 924503f2..7104792e 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -65,6 +65,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
register_command(new TrexRpcCmdRemoveRXFilters());
register_command(new TrexRpcCmdValidate());
+ register_command(new TrexRpcCmdPushRemote());
}
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp
index 1321a362..fc96e00a 100644
--- a/src/stateless/cp/trex_dp_port_events.cpp
+++ b/src/stateless/cp/trex_dp_port_events.cpp
@@ -78,6 +78,9 @@ protected:
virtual void on_event() {
/* do nothing */
}
+ virtual void on_error(int thread_id) {
+ /* do nothing */
+ }
};
void
@@ -105,14 +108,14 @@ TrexDpPortEvents::barrier() {
*
*/
void
-TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id) {
+TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id, bool status) {
TrexDpPortEvent *event = lookup(event_id);
/* event might have been deleted */
if (!event) {
return;
}
- bool done = event->on_core_reporting_in(thread_id);
+ bool done = event->on_core_reporting_in(thread_id, status);
if (done) {
destroy_event(event_id);
@@ -150,7 +153,7 @@ TrexDpPortEvent::init(TrexStatelessPort *port, int event_id, int timeout_ms) {
}
bool
-TrexDpPortEvent::on_core_reporting_in(int thread_id) {
+TrexDpPortEvent::on_core_reporting_in(int thread_id, bool status) {
/* mark sure no double signal */
if (m_signal.at(thread_id)) {
std::stringstream err;
@@ -163,6 +166,11 @@ TrexDpPortEvent::on_core_reporting_in(int thread_id) {
m_signal.at(thread_id) = true;
m_pending_cnt--;
+ /* if any core reported an error - mark as a failure */
+ if (!status) {
+ on_error(thread_id);
+ }
+
/* event occured */
if (m_pending_cnt == 0) {
on_event();
diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h
index 3b8c8633..681e47ab 100644
--- a/src/stateless/cp/trex_dp_port_events.h
+++ b/src/stateless/cp/trex_dp_port_events.h
@@ -48,13 +48,22 @@ protected:
*/
virtual void on_event() = 0;
+ /**
+ * when a thread ID encounter an error
+ *
+ * @author imarom (20-Apr-16)
+ *
+ * @param thread_id
+ */
+ virtual void on_error(int thread_id) = 0;
+
TrexStatelessPort *get_port() {
return m_port;
}
private:
void init(TrexStatelessPort *port, int event_id, int timeout_ms);
- bool on_core_reporting_in(int thread_id);
+ bool on_core_reporting_in(int thread_id, bool status = true);
std::unordered_map<int, bool> m_signal;
int m_pending_cnt;
@@ -98,7 +107,7 @@ public:
/**
* a core has reached the event
*/
- void on_core_reporting_in(int event_id, int thread_id);
+ void on_core_reporting_in(int event_id, int thread_id, bool status = true);
private:
TrexDpPortEvent *lookup(int event_id);
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 90142d9b..b09393f9 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -24,6 +24,7 @@ limitations under the License.
#include <trex_stateless_messaging.h>
#include <trex_streams_compiler.h>
#include <common/basic_utils.h>
+#include <common/captureFile.h>
#include <string>
@@ -70,6 +71,20 @@ protected:
assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
}
+
+ /**
+ * when a DP core encountered an error
+ *
+ * @author imarom (20-Apr-16)
+ */
+ virtual void on_error(int thread_id) {
+ Json::Value data;
+
+ data["port_id"] = get_port()->get_port_id();
+ data["thread_id"] = thread_id;
+
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ERROR, data);
+ }
};
/***************************
@@ -395,6 +410,46 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
}
+void
+TrexStatelessPort::push_remote(const std::string &pcap_filename, double ipg_usec, double speedup, uint32_t count) {
+ /* command allowed only on state stream */
+ verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
+
+ /* check that file exists */
+ CCapReaderBase *reader;
+ std::stringstream ss;
+ reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
+ if (!reader) {
+ throw TrexException(ss.str());
+ }
+ delete reader;
+
+ /* only one core gets to play */
+ int tx_core = m_cores_id_list[0];
+
+ /* create async event */
+ assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
+ m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
+
+ /* mark all other cores as done */
+ for (int index = 1; index < m_cores_id_list.size(); index++) {
+ /* mimic an end event */
+ m_dp_events.on_core_reporting_in(m_pending_async_stop_event, m_cores_id_list[index]);
+ }
+
+ /* send a message to core */
+ change_state(PORT_STATE_TX);
+ TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(m_port_id,
+ m_pending_async_stop_event,
+ pcap_filename);
+ send_message_to_dp(tx_core, push_msg);
+
+ /* update subscribers */
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
+}
+
std::string
TrexStatelessPort::get_state_as_string() const {
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 520940d8..502c066d 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -212,6 +212,12 @@ public:
void update_traffic(const TrexPortMultiplier &mul, bool force);
/**
+ * push a PCAP file onto the port
+ *
+ */
+ void push_remote(const std::string &pcap_filename, double ipg_usec, double speedup, uint32_t count);
+
+ /**
* get the port state
*
*/
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index d3d49a34..31c907fa 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -262,6 +262,36 @@ bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
return (true);
}
+bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id, const std::string &pcap_filename){
+
+ /* push pcap can only happen on an idle port from the core prespective */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
+
+ CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
+ if (!pcap_node) {
+ return (false);
+ }
+
+ pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
+
+ uint8_t mac_addr[12];
+ m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
+
+ bool rc = pcap_node->create(port_id, pcap_filename, dir, mac_addr);
+ if (!rc) {
+ m_core->free_node((CGenNode *)pcap_node);
+ return (false);
+ }
+
+ /* schedule the node for now */
+ pcap_node->m_time = m_core->m_cur_time_sec;
+ m_core->m_node_gen.add_node((CGenNode *)pcap_node);
+
+ m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
+
+ return (true);
+}
+
bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
bool stop_on_id,
@@ -305,7 +335,6 @@ bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){
m_core=core;
m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
- m_port_id=0;
m_active_streams=0;
m_active_nodes.clear();
}
@@ -579,6 +608,7 @@ void
TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
TrexStream * stream,
TrexStreamsCompiledObj *comp) {
+
CGenNodeStateless *node = m_core->create_node_sl();
/* add periodic */
@@ -834,6 +864,37 @@ TrexStatelessDpCore::pause_traffic(uint8_t port_id){
lp_port->pause_traffic(port_id);
}
+
+void
+TrexStatelessDpCore::push_pcap(uint8_t port_id, int event_id, const std::string &pcap_filename) {
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ lp_port->set_event_id(event_id);
+
+ /* delegate the command to the port */
+ bool rc = lp_port->push_pcap(port_id, pcap_filename);
+ if (!rc) {
+ /* report back that we stopped */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
+ TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
+ port_id,
+ event_id,
+ false);
+ ring->Enqueue((CGenNode *)event_msg);
+ return;
+ }
+
+ m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
+
+ #if 0
+ if ( duration > 0.0 ){
+ add_port_duration(duration, port_id, event_id);
+ }
+ #endif
+}
+
+
void
TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
@@ -895,3 +956,60 @@ TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
event_id);
ring->Enqueue((CGenNode *)event_msg);
}
+
+
+/**
+ * PCAP node
+ */
+bool CGenNodePCAP::create(uint8_t port_id, const std::string &pcap_filename, pkt_dir_t dir, const uint8_t *mac_addr) {
+ std::stringstream ss;
+
+ m_type = CGenNode::PCAP_PKT;
+ m_flags = 0;
+ m_src_port = 0;
+ m_port_id = port_id;
+
+ /* copy MAC addr info */
+ memcpy(m_mac_addr, mac_addr, 12);
+
+ /* set the dir */
+ set_mbuf_dir(dir);
+
+ /* create the PCAP reader */
+ m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
+ if (!m_reader) {
+ return false;
+ }
+
+ m_raw_packet = new CCapPktRaw();
+ if ( m_reader->ReadPacket(m_raw_packet) == false ){
+ /* handle error */
+ delete m_reader;
+ return (false);
+ }
+
+ /* this is the reference time */
+ //m_base_time = m_raw_packet->get_time();
+ m_last_pkt_time = m_raw_packet->get_time();
+
+ /* ready */
+ m_state = PCAP_ACTIVE;
+
+ return true;
+}
+
+void CGenNodePCAP::destroy() {
+
+ if (m_raw_packet) {
+ delete m_raw_packet;
+ m_raw_packet = NULL;
+ }
+
+ if (m_reader) {
+ delete m_reader;
+ m_reader = NULL;
+ }
+
+ m_state = PCAP_INVALID;
+}
+
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index cb102b8d..01033a7c 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -70,6 +70,8 @@ public:
bool update_traffic(uint8_t port_id, double factor);
+ bool push_pcap(uint8_t port_id, const std::string &pcap_filename);
+
bool stop_traffic(uint8_t port_id,
bool stop_on_id,
int event_id);
@@ -91,7 +93,6 @@ public:
public:
state_e m_state;
- uint8_t m_port_id;
uint32_t m_active_streams; /* how many active streams on this port */
@@ -149,7 +150,7 @@ public:
*/
void start_traffic(TrexStreamsCompiledObj *obj,
double duration,
- int m_event_id);
+ int event_id);
/* pause the streams, work only if all are continues */
@@ -161,6 +162,13 @@ public:
/**
+ * push a PCAP file on port
+ *
+ */
+ void push_pcap(uint8_t port_id, int event_id, const std::string &pcap_filename);
+
+
+ /**
* update current traffic rate
*
* @author imarom (25-Nov-15)
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index c85bf8b5..8ccb5286 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -26,6 +26,8 @@ limitations under the License.
#include <stdio.h>
class TrexStatelessDpCore;
+class TrexStatelessDpPerPort;
+
#include <trex_stream.h>
class TrexStatelessCpToDpMsgBase;
@@ -387,6 +389,133 @@ private:
static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)" );
+/* this is a event for PCAP transmitting */
+struct CGenNodePCAP : public CGenNodeBase {
+friend class TrexStatelessDpPerPort;
+
+public:
+
+ /**
+ * creates a node from a PCAP file
+ */
+ bool create(uint8_t port_id, const std::string &pcap_filename, pkt_dir_t dir, const uint8_t *mac_addr);
+ void destroy();
+
+ /**
+ * advance - will read the next packet
+ *
+ * @author imarom (03-May-16)
+ */
+ void next() {
+ assert(m_state == PCAP_ACTIVE);
+
+ /* save the previous packet time */
+ m_last_pkt_time = m_raw_packet->get_time();
+
+ /* advance */
+ if ( m_reader->ReadPacket(m_raw_packet) == false ){
+ m_state = PCAP_EOF;
+ return;
+ }
+
+ }
+
+ /**
+ * return true if the PCAP has next packet
+ *
+ */
+ bool has_next() {
+ assert(m_state != PCAP_INVALID);
+ return (m_state == PCAP_ACTIVE);
+ }
+
+ /**
+ * return the time for the next scheduling for a packet
+ *
+ */
+ inline double get_ipg() {
+ assert(m_state != PCAP_INVALID);
+ return m_raw_packet->get_time() - m_last_pkt_time;
+ //return 0.00001;
+ }
+
+ /**
+ * get the current packet as MBUF
+ *
+ */
+ inline rte_mbuf_t *get_pkt() {
+ assert(m_state != PCAP_INVALID);
+
+ rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), m_raw_packet->getTotalLen());
+ assert(m);
+
+ char *p = rte_pktmbuf_append(m, m_raw_packet->getTotalLen());
+ assert(p);
+ /* copy the packet */
+ memcpy(p, m_raw_packet->raw, m_raw_packet->getTotalLen());
+
+ /* fix the MAC */
+ memcpy(p, m_mac_addr, 12);
+
+ return (m);
+ }
+
+
+ inline void handle(CFlowGenListPerThread *thread) {
+ assert(m_state != PCAP_INVALID);
+ thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+ }
+
+ void set_mbuf_dir(pkt_dir_t dir) {
+ if (dir) {
+ m_flags |=NODE_FLAGS_DIR;
+ }else{
+ m_flags &=~NODE_FLAGS_DIR;
+ }
+ }
+
+ inline pkt_dir_t get_mbuf_dir(){
+ return ((pkt_dir_t)( m_flags &1));
+ }
+
+ uint8_t get_port_id() {
+ return m_port_id;
+ }
+
+private:
+
+ enum {
+ PCAP_INVALID = 0,
+ PCAP_ACTIVE,
+ PCAP_EOF
+ };
+
+ /* cache line 0 */
+ /* important stuff here */
+ uint8_t m_mac_addr[12];
+ uint8_t m_state;
+
+ //double m_base_time;
+ //double m_current_pkt_time;
+ double m_last_pkt_time;
+
+ void * m_cache_mbuf;
+
+ double m_next_time_offset; /* in sec */
+
+ CCapReaderBase *m_reader;
+ CCapPktRaw *m_raw_packet;
+
+ uint8_t m_port_id;
+
+ /* pad to match the size of CGenNode */
+ uint8_t m_pad_end[25];
+
+} __rte_cache_aligned;
+
+
+static_assert(sizeof(CGenNodePCAP) == sizeof(CGenNode), "sizeof(CGenNodePCAP) != sizeof(CGenNode)" );
#endif /* __TREX_STREAM_NODE_H__ */
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 7edf0f13..59e0a0a8 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -181,6 +181,24 @@ TrexStatelessDpUpdate::clone() {
return new_msg;
}
+
+/*************************
+ push PCAP message
+ ************************/
+bool
+TrexStatelessDpPushPCAP::handle(TrexStatelessDpCore *dp_core) {
+ dp_core->push_pcap(m_port_id, m_event_id, m_pcap_filename);
+ return true;
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpPushPCAP::clone() {
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpPushPCAP(m_port_id, m_event_id, m_pcap_filename);
+
+ return new_msg;
+}
+
+
/*************************
barrier message
************************/
@@ -203,7 +221,7 @@ TrexStatelessDpBarrier::clone() {
bool
TrexDpPortEventMsg::handle() {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id);
- port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id);
+ port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id, get_status());
return (true);
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 0eed01bd..8fb2a456 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -246,6 +246,29 @@ private:
double m_factor;
};
+
+/**
+ * psuh a PCAP message
+ */
+class TrexStatelessDpPushPCAP : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpPushPCAP(uint8_t port_id, int event_id, const std::string &pcap_filename) : m_pcap_filename(pcap_filename) {
+ m_port_id = port_id;
+ m_event_id = event_id;
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+private:
+ std::string m_pcap_filename;
+ uint8_t m_port_id;
+ int m_event_id;
+};
+
+
/**
* barrier message for DP core
*
@@ -267,6 +290,7 @@ private:
int m_event_id;
};
+
/************************* messages from DP to CP **********************/
/**
@@ -303,10 +327,11 @@ public:
class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
public:
- TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id) {
- m_thread_id = thread_id;
- m_port_id = port_id;
- m_event_id = event_id;
+ TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id, bool status = true) {
+ m_thread_id = thread_id;
+ m_port_id = port_id;
+ m_event_id = event_id;
+ m_status = status;
}
virtual bool handle();
@@ -323,10 +348,15 @@ public:
return m_event_id;
}
+ bool get_status() {
+ return m_status;
+ }
+
private:
int m_thread_id;
uint8_t m_port_id;
int m_event_id;
+ bool m_status;
};