summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorHanoh Haim <hhaim@cisco.com>2016-05-10 17:49:25 +0300
committerHanoh Haim <hhaim@cisco.com>2016-05-10 17:49:25 +0300
commit2d37b9f98020a4458aaad1f3fd05ca5e408213e0 (patch)
tree3a8cd16eb748711b72df37c6f7eea4842d73290a /src
parent996f2451dba01f534420418eaac2856510682757 (diff)
parent63bf6aba10075a03fe6609369c1c7008afb85ba7 (diff)
merge from master
Diffstat (limited to 'src')
-rwxr-xr-xsrc/bp_sim.cpp413
-rwxr-xr-xsrc/bp_sim.h35
-rwxr-xr-xsrc/common/captureFile.cpp16
-rwxr-xr-xsrc/common/captureFile.h6
-rwxr-xr-xsrc/common/pcap.cpp3
-rw-r--r--src/gtest/trex_stateless_gtest.cpp64
-rw-r--r--src/main_dpdk.cpp242
-rw-r--r--src/publisher/trex_publisher.h1
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp29
-rw-r--r--src/rpc-server/commands/trex_rpc_cmds.h2
-rw-r--r--src/rpc-server/trex_rpc_cmds_table.cpp1
-rw-r--r--src/stateless/cp/trex_dp_port_events.cpp14
-rw-r--r--src/stateless/cp/trex_dp_port_events.h13
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp78
-rw-r--r--src/stateless/cp/trex_stateless_port.h27
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp192
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h29
-rw-r--r--src/stateless/dp/trex_stream_node.h172
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp32
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h52
20 files changed, 1022 insertions, 399 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
index 76bd6ec7..f9cb3220 100755
--- a/src/bp_sim.cpp
+++ b/src/bp_sim.cpp
@@ -3858,187 +3858,86 @@ int CNodeGenerator::flush_file(dsec_t max_time,
-#if 0
-int CNodeGenerator::flush_file(dsec_t max_time,
- dsec_t d_time,
- bool always,
- CFlowGenListPerThread * thread,
- double &old_offset){
- CGenNode * node;
- #ifdef TREX_SIM
- dsec_t flush_time=now_sec();
- #endif
- dsec_t offset=0.0;
- #ifdef TREX_SIM
- dsec_t n_time;
- #endif
- if (always) {
- offset=old_offset;
- }
- #ifdef TREX_SIM
- uint32_t events=0;
- #endif
- bool done=false;
-
- thread->m_cpu_dp_u.start_work1();
-
- /**
- * if a positive value was given to max time
- * schedule an exit node
- */
- if ( (max_time > 0) && (!always) ) {
- CGenNode *exit_node = thread->create_node();
-
- exit_node->m_type = CGenNode::EXIT_SCHED;
- exit_node->m_time = max_time;
- add_node(exit_node);
- }
-
- while (true) {
-
- node = m_p_queue.top();
- #ifdef TREX_SIM
- n_time = node->m_time + offset;
-
- events++;
+void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thread) {
+
+ /*repeat and NAT is not supported */
+ if ( node->is_nat_first_state() ) {
+ node->set_nat_wait_state();
+ flush_one_node_to_file(node);
+ #ifdef _DEBUG
+ update_stats(node);
#endif
-/*#ifdef VALG
- if (events > 1 ) {
- CALLGRIND_START_INSTRUMENTATION;
- }
-#endif*/
-
- thread->m_cpu_dp_u.commit1();
- thread->m_cpu_dp_u.start_work1();
-
- #ifdef TREX_SIM
-
- if ( likely ( m_is_realtime ) ){
- dsec_t dt ;
- thread->m_cpu_dp_u.commit1();
-
- while ( true ) {
- dt = now_sec() - n_time ;
-
- if (dt> (-0.00003)) {
- break;
- }
-
- rte_pause();
- }
- thread->m_cpu_dp_u.start_work1();
+ } else {
+ if ( node->is_nat_wait_state() ) {
+ if (node->is_responder_pkt()) {
+ m_p_queue.pop();
+ /* time out, need to free the flow and remove the association , we didn't get convertion yet*/
+ thread->terminate_nat_flows(node);
+ return;
- /* add offset in case of faliures more than 100usec */
- if ( unlikely( dt > 0.000100 ) ) {
- offset += dt;
- }
- /* update histogram */
- if ( unlikely( events % 16 ) ==0 ) {
- m_realtime_his.Add(dt);
- }
- /* flush evey 10 usec */
- if ( now_sec() - flush_time > 0.00001 ){
- m_v_if->flush_tx_queue();
- flush_time=now_sec();
+ } else {
+ flush_one_node_to_file(node);
+ #ifdef _DEBUG
+ update_stats(node);
+ #endif
}
+ } else {
+ assert(0);
}
- #endif
+ }
+ m_p_queue.pop();
+ if ( node->is_last_in_flow() ) {
+ thread->free_last_flow_node( node);
+ } else {
+ node->update_next_pkt_in_flow();
+ m_p_queue.push(node);
+ }
+}
+void CNodeGenerator::handle_flow_sync(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler) {
+ /* flow sync message is a sync point for time */
+ thread->m_cur_time_sec = node->m_time;
- uint8_t type=node->m_type;
+ /* first pop the node */
+ m_p_queue.pop();
- if ( type == CGenNode::STATELESS_PKT ) {
- m_p_queue.pop();
- CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
+ thread->check_msgs(); /* check messages */
+ m_v_if->flush_tx_queue(); /* flush pkt each timeout */
- /* if the stream has been deactivated - end */
- if ( unlikely( node_sl->is_mask_for_free() ) ) {
- thread->free_node(node);
- } else {
+ /* exit in case this is the last node*/
+ if ( m_p_queue.size() == m_parent->m_non_active_nodes ) {
+ thread->free_node(node);
+ exit_scheduler = true;
+ } else {
+ /* schedule for next maintenace */
+ node->m_time += SYNC_TIME_OUT;
+ m_p_queue.push(node);
+ }
- /* count before handle - node might be destroyed */
- #ifdef TREX_SIM
- update_stl_stats(node_sl);
- #endif
+}
- node_sl->handle(thread);
+void CNodeGenerator::handle_command(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler) {
+ m_p_queue.pop();
+ CGenNodeCommand *node_cmd = (CGenNodeCommand *)node;
+ TrexStatelessCpToDpMsgBase * cmd=node_cmd->m_cmd;
+ cmd->handle(&thread->m_stateless_dp_info);
+ exit_scheduler = cmd->is_quit();
+ thread->free_node((CGenNode *)node_cmd);/* free the node */
+}
- #ifdef TREX_SIM
- if (has_limit_reached()) {
- thread->m_stateless_dp_info.stop_traffic(node_sl->get_port_id(), false, 0);
- }
- #endif
+void CNodeGenerator::handle_pcap_pkt(CGenNode *node, CFlowGenListPerThread *thread) {
+ m_p_queue.pop();
- }
-
-
- }else{
- if ( likely( type == CGenNode::FLOW_PKT ) ) {
- /* PKT */
- if ( !(node->is_repeat_flow()) || (always==false)) {
- flush_one_node_to_file(node);
- #ifdef _DEBUG
- update_stats(node);
- #endif
- }
- m_p_queue.pop();
- if ( node->is_last_in_flow() ) {
- if ((node->is_repeat_flow()) && (always==false)) {
- /* Flow is repeated, reschedule it */
- thread->reschedule_flow( node);
- }else{
- /* Flow will not be repeated, so free node */
- thread->free_last_flow_node( node);
- }
- }else{
- node->update_next_pkt_in_flow();
- m_p_queue.push(node);
- }
- }else{
- if ((type == CGenNode::FLOW_FIF)) {
- /* callback to our method */
- m_p_queue.pop();
- if ( always == false) {
- thread->m_cur_time_sec = node->m_time ;
-
- if ( thread->generate_flows_roundrobin(&done) <0){
- break;
- }
- if (!done) {
- node->m_time +=d_time;
- m_p_queue.push(node);
- }else{
- thread->free_node(node);
- }
- }else{
- thread->free_node(node);
- }
-
- }else{
- bool exit_sccheduler = handle_slow_messages(type,node,thread,always);
- if (exit_sccheduler) {
- break;
- }
- }
- }
- }
- }
+ CGenNodePCAP *node_pcap = (CGenNodePCAP *)node;
- if ( thread->is_terminated_by_master() ) {
- return (0);
- }
-
- if (!always) {
- old_offset =offset;
- }else{
- // free the left other
- thread->handler_defer_job_flush();
+ /* might have been marked for free */
+ if ( unlikely( node_pcap->is_marked_for_free() ) ) {
+ thread->free_node(node);
+ } else {
+ node_pcap->handle(thread);
}
- return (0);
}
-#endif
-
bool
CNodeGenerator::handle_slow_messages(uint8_t type,
CGenNode * node,
@@ -4048,89 +3947,42 @@ CNodeGenerator::handle_slow_messages(uint8_t type,
/* should we continue after */
bool exit_scheduler = false;
- if (unlikely (type == CGenNode::FLOW_DEFER_PORT_RELEASE) ) {
+ switch (type) {
+ case CGenNode::PCAP_PKT:
+ handle_pcap_pkt(node, thread);
+ break;
+
+ case CGenNode::FLOW_DEFER_PORT_RELEASE:
m_p_queue.pop();
thread->handler_defer_job(node);
thread->free_node(node);
+ break;
- } else if (type == CGenNode::FLOW_PKT_NAT) {
- /*repeat and NAT is not supported */
- if ( node->is_nat_first_state() ){
- node->set_nat_wait_state();
- flush_one_node_to_file(node);
- #ifdef _DEBUG
- update_stats(node);
- #endif
- }else{
- if ( node->is_nat_wait_state() ) {
- if (node->is_responder_pkt()) {
- m_p_queue.pop();
- /* time out, need to free the flow and remove the association , we didn't get convertion yet*/
- thread->terminate_nat_flows(node);
- return (exit_scheduler);
-
- }else{
- flush_one_node_to_file(node);
- #ifdef _DEBUG
- update_stats(node);
- #endif
- }
- }else{
- assert(0);
- }
- }
- m_p_queue.pop();
- if ( node->is_last_in_flow() ) {
- thread->free_last_flow_node( node);
- }else{
- node->update_next_pkt_in_flow();
- m_p_queue.push(node);
- }
-
- } else if ( type == CGenNode::FLOW_SYNC ) {
+ case CGenNode::FLOW_PKT_NAT:
+ handle_flow_pkt(node, thread);
+ break;
- /* flow sync message is a sync point for time */
- thread->m_cur_time_sec = node->m_time;
+ case CGenNode::FLOW_SYNC:
+ handle_flow_sync(node, thread, exit_scheduler);
+ break;
- /* first pop the node */
- m_p_queue.pop();
+ case CGenNode::EXIT_SCHED:
+ m_p_queue.pop();
+ thread->free_node(node);
+ exit_scheduler = true;
+ break;
- thread->check_msgs(); /* check messages */
- m_v_if->flush_tx_queue(); /* flush pkt each timeout */
- /* exit in case this is the last node*/
- if ( m_p_queue.size() == m_parent->m_non_active_nodes ) {
- thread->free_node(node);
- exit_scheduler = true;
- } else {
- /* schedule for next maintenace */
- node->m_time += SYNC_TIME_OUT;
- m_p_queue.push(node);
- }
+ case CGenNode::COMMAND:
+ handle_command(node, thread, exit_scheduler);
+ break;
+ default:
+ assert(0);
+ }
- } else if ( type == CGenNode::EXIT_SCHED ) {
- m_p_queue.pop();
- thread->free_node(node);
- exit_scheduler = true;
-
- } else {
- if ( type == CGenNode::COMMAND) {
- m_p_queue.pop();
- CGenNodeCommand *node_cmd = (CGenNodeCommand *)node;
- {
- TrexStatelessCpToDpMsgBase * cmd=node_cmd->m_cmd;
- cmd->handle(&thread->m_stateless_dp_info);
- exit_scheduler = cmd->is_quit();
- thread->free_node((CGenNode *)node_cmd);/* free the node */
- }
- }else{
- printf(" ERROR type is not valid %d \n",type);
- assert(0);
- }
- }
+ return (exit_scheduler);
- return exit_scheduler;
}
@@ -4408,8 +4260,6 @@ void CFlowGenListPerThread::check_msgs(void) {
}
}
-//void delay(int msec);
-
void CFlowGenListPerThread::start_stateless_simulation_file(std::string erf_file_name,
@@ -5123,39 +4973,66 @@ int CErfIFStl::update_mac_addr_from_global_cfg(pkt_dir_t dir, uint8_t * p){
}
-int CErfIFStl::send_node(CGenNode * _no_to_use){
-
- if ( m_preview_mode->getFileWrite() ){
-
- CGenNodeStateless * node_sl=(CGenNodeStateless *) _no_to_use;
+int CErfIFStl::send_sl_node(CGenNodeStateless *node_sl) {
+ pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir();
- pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir();
-
- rte_mbuf_t * m;
- if ( likely(node_sl->is_cache_mbuf_array()) ) {
- m=node_sl->cache_mbuf_array_get_cur();
- fill_raw_packet(m,_no_to_use,dir);
+ rte_mbuf_t * m;
+ if ( likely(node_sl->is_cache_mbuf_array()) ) {
+ m=node_sl->cache_mbuf_array_get_cur();
+ fill_raw_packet(m,(CGenNode *)node_sl,dir);
+ }else{
+ m=node_sl->get_cache_mbuf();
+ if (m) {
+ /* cache packet */
+ fill_raw_packet(m,(CGenNode *)node_sl,dir);
+ /* can't free the m, it is cached*/
}else{
- m=node_sl->get_cache_mbuf();
- if (m) {
- /* cache packet */
- fill_raw_packet(m,_no_to_use,dir);
- /* can't free the m, it is cached*/
- }else{
-
- m=node_sl->alloc_node_with_vm();
- assert(m);
- fill_raw_packet(m,_no_to_use,dir);
- rte_pktmbuf_free(m);
- }
+ m=node_sl->alloc_node_with_vm();
+ assert(m);
+ fill_raw_packet(m,(CGenNode *)node_sl,dir);
+ rte_pktmbuf_free(m);
}
+ }
/* check that we have mbuf */
+ int rc = write_pkt(m_raw);
+ BP_ASSERT(rc == 0);
+
+ return (rc);
+}
- int rc = write_pkt(m_raw);
- BP_ASSERT(rc == 0);
+
+int CErfIFStl::send_pcap_node(CGenNodePCAP *pcap_node) {
+ rte_mbuf_t *m = pcap_node->get_pkt();
+ if (!m) {
+ return (-1);
}
+ pkt_dir_t dir = (pkt_dir_t)pcap_node->get_mbuf_dir();
+ fill_raw_packet(m, (CGenNode*)pcap_node, dir);
+ rte_pktmbuf_free(m);
+
+ int rc = write_pkt(m_raw);
+ BP_ASSERT(rc == 0);
+
+ return (rc);
+}
+
+int CErfIFStl::send_node(CGenNode * _no_to_use){
+
+ if ( m_preview_mode->getFileWrite() ) {
+
+ switch (_no_to_use->m_type) {
+ case CGenNode::STATELESS_PKT:
+ return send_sl_node((CGenNodeStateless *) _no_to_use);
+
+ case CGenNode::PCAP_PKT:
+ return send_pcap_node((CGenNodePCAP *) _no_to_use);
+
+ default:
+ assert(0);
+ }
+ }
return (0);
}
@@ -6523,10 +6400,18 @@ void CGenNodeBase::free_base(){
CGenNodeStateless* p=(CGenNodeStateless*)this;
p->free_stl_node();
return;
+ }
+
+ if (m_type == PCAP_PKT) {
+ CGenNodePCAP *p = (CGenNodePCAP *)this;
+ p->destroy();
+ return;
}
+
if ( m_type == COMMAND ) {
CGenNodeCommand* p=(CGenNodeCommand*)this;
p->free_command();
}
}
+
diff --git a/src/bp_sim.h b/src/bp_sim.h
index 0a7e8bda..bdca7703 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -61,6 +61,8 @@ limitations under the License.
#include <trex_stateless_dp_core.h>
+class CGenNodePCAP;
+
#undef NAT_TRACE_
#define FORCE_NO_INLINE __attribute__ ((noinline))
@@ -1423,7 +1425,9 @@ public:
EXIT_SCHED =6,
COMMAND =7,
- EXIT_PORT_SCHED =8
+ EXIT_PORT_SCHED =8,
+
+ PCAP_PKT =9,
};
@@ -1441,6 +1445,7 @@ public:
NODE_FLAGS_INIT_START_FROM_SERVER_SIDE = 0x40,
NODE_FLAGS_ALL_FLOW_SAME_PORT_SIDE = 0x80,
NODE_FLAGS_INIT_START_FROM_SERVER_SIDE_SERVER_ADDR = 0x100, /* init packet start from server side with server addr */
+ NODE_FLAGS_SLOW_PATH = 0x200 /* used by the nodes to differ between fast path nodes and slow path nodes */
};
@@ -1479,6 +1484,17 @@ public:
return ( m_socket_id );
}
+ inline void set_slow_path(bool enable) {
+ if (enable) {
+ m_flags |= NODE_FLAGS_SLOW_PATH;
+ } else {
+ m_flags &= ~NODE_FLAGS_SLOW_PATH;
+ }
+ }
+
+ inline bool get_is_slow_path() const {
+ return ( (m_flags & NODE_FLAGS_SLOW_PATH) ? true : false);
+ }
void free_base();
};
@@ -1581,6 +1597,7 @@ public:
return ( (m_flags &NODE_FLAGS_ALL_FLOW_SAME_PORT_SIDE)?true:false );
}
+
/* direction for ip addr */
inline pkt_dir_t cur_pkt_ip_addr_dir();
@@ -1873,6 +1890,9 @@ public:
virtual pkt_dir_t port_id_to_dir(uint8_t port_id);
+private:
+ int send_sl_node(CGenNodeStateless * node_sl);
+ int send_pcap_node(CGenNodePCAP * pcap_node);
};
@@ -2085,7 +2105,11 @@ private:
bool always,
CFlowGenListPerThread * thread,
double &old_offset);
-
+private:
+ void handle_command(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler);
+ void handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thread);
+ void handle_flow_sync(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler);
+ void handle_pcap_pkt(CGenNode *node, CFlowGenListPerThread *thread);
public:
pqueue_t m_p_queue;
@@ -3628,10 +3652,14 @@ public :
inline CGenNode * create_node(void);
+
inline CGenNodeStateless * create_node_sl(void){
return ((CGenNodeStateless*)create_node() );
}
+ inline CGenNodePCAP * allocate_pcap_node(void) {
+ return ((CGenNodePCAP*)create_node());
+ }
inline void free_node(CGenNode *p);
inline void free_last_flow_node(CGenNode *p);
@@ -3653,6 +3681,9 @@ public:
bool set_stateless_next_node( CGenNodeStateless * cur_node,
CGenNodeStateless * next_node);
+ void stop_stateless_traffic(uint8_t port_id) {
+ m_stateless_dp_info.stop_traffic(port_id, false, 0);
+ }
void Dump(FILE *fd);
void DumpCsv(FILE *fd);
diff --git a/src/common/captureFile.cpp b/src/common/captureFile.cpp
index e73c37ad..4c50bcb2 100755
--- a/src/common/captureFile.cpp
+++ b/src/common/captureFile.cpp
@@ -244,28 +244,23 @@ bool CErfCmp::compare(std::string f1, std::string f2 ){
return (res);
}
-
-
/**
* try to create type by type
* @param name
*
* @return CCapReaderBase*
*/
-CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops)
+CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops, std::ostream &err)
{
- if (name == NULL) {
- printf("Got null file name\n");
- return NULL;
- }
+ assert(name);
/* make sure we have a file */
FILE * f = CAP_FOPEN_64(name, "rb");
if (f == NULL) {
if (errno == ENOENT) {
- printf("\nERROR: Cap file not found %s\n\n",name);
+ err << "CAP file '" << name << "' not found";
} else {
- printf("\nERROR: Failed to open cap file '%s' with errno %d\n\n", name, errno);
+ err << "failed to open CAP file '" << name << "' with errno " << errno;
}
return NULL;
}
@@ -281,8 +276,7 @@ CCapReaderBase * CCapReaderFactory::CreateReader(char * name, int loops)
delete next;
}
- printf("\nERROR: file %s format not supported",name);
- printf("\nERROR: formats supported are LIBPCAP and ERF. other formats are deprecated\n\n");
+ err << "unsupported CAP format (not PCAP or ERF): " << name << "\n";
return NULL;
}
diff --git a/src/common/captureFile.h b/src/common/captureFile.h
index 3be83432..32b98272 100755
--- a/src/common/captureFile.h
+++ b/src/common/captureFile.h
@@ -24,6 +24,8 @@ limitations under the License.
#include <math.h>
#include <stdlib.h>
#include <string>
+#include <iostream>
+
#ifdef WIN32
#pragma warning(disable:4786)
#endif
@@ -201,11 +203,13 @@ public:
* @param name - cature file name
* @param loops - number of loops for the same capture. use 0
* for one time transmition
+ * @param err - IO stream to print error
+ *
* @return CCapReaderBase* - pointer to new instance (allocated
* by the function). the user should release the
* instance once it has no use any more.
*/
- static CCapReaderBase * CreateReader(char * name, int loops = 0);
+ static CCapReaderBase * CreateReader(char * name, int loops = 0, std::ostream &err = std::cout);
private:
diff --git a/src/common/pcap.cpp b/src/common/pcap.cpp
index 9b360a3e..a68fb620 100755
--- a/src/common/pcap.cpp
+++ b/src/common/pcap.cpp
@@ -156,8 +156,7 @@ bool LibPCapReader::ReadPacket(CCapPktRaw *lpPacket)
}
if (pkt_header.len > READER_MAX_PACKET_SIZE) {
/* cannot read this packet */
- printf("ERROR packet is too big, bigger than %d \n",READER_MAX_PACKET_SIZE);
- exit(-1);
+ //printf("ERROR packet is too big, bigger than %d \n",READER_MAX_PACKET_SIZE);
return false;
}
diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp
index 11070f5c..50f8e5ec 100644
--- a/src/gtest/trex_stateless_gtest.cpp
+++ b/src/gtest/trex_stateless_gtest.cpp
@@ -3611,6 +3611,70 @@ TEST_F(basic_stl, vm_split_client_var) {
}
+TEST_F(basic_stl, pcap_remote_basic) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/pcap_remote_basic";
+
+ TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(0,
+ 0,
+ "exp/remote_test.cap",
+ 10,
+ 1,
+ 1,
+ -1);
+ t1.m_msg = push_msg;
+
+ bool res = t1.init();
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+TEST_F(basic_stl, pcap_remote_loop) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/pcap_remote_loop";
+
+ TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(0,
+ 0,
+ "exp/remote_test.cap",
+ 1,
+ 1,
+ 3,
+ -1);
+ t1.m_msg = push_msg;
+
+ bool res = t1.init();
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+TEST_F(basic_stl, pcap_remote_duration) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/pcap_remote_duration";
+
+ TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(0,
+ 0,
+ "exp/remote_test.cap",
+ 100000,
+ 1,
+ 0,
+ 0.5);
+ t1.m_msg = push_msg;
+
+ bool res = t1.init();
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
/********************************************* Itay Tests End *************************************/
class rx_stat_pkt_parse : public testing::Test {
protected:
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 3fa3ca68..092bd133 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -1773,6 +1773,9 @@ protected:
class CCoreEthIFStateless : public CCoreEthIF {
public:
virtual int send_node(CGenNode * node);
+protected:
+ int handle_slow_path_node(CGenNode *node);
+ int send_pcap_node(CGenNodePCAP *pcap_node);
};
bool CCoreEthIF::Create(uint8_t core_id,
@@ -1991,7 +1994,14 @@ void CCoreEthIF::update_mac_addr(CGenNode * node,uint8_t *p){
-int CCoreEthIFStateless::send_node(CGenNode * no){
+int CCoreEthIFStateless::send_node(CGenNode * no) {
+
+ /* if a node is marked as slow path - single IF to redirect it to slow path */
+ if (no->get_is_slow_path()) {
+ return handle_slow_path_node(no);
+ }
+
+
CGenNodeStateless * node_sl=(CGenNodeStateless *) no;
/* check that we have mbuf */
rte_mbuf_t * m;
@@ -2027,6 +2037,33 @@ int CCoreEthIFStateless::send_node(CGenNode * no){
return (0);
};
+int CCoreEthIFStateless::send_pcap_node(CGenNodePCAP *pcap_node) {
+ rte_mbuf_t *m = pcap_node->get_pkt();
+ if (!m) {
+ return (-1);
+ }
+
+ pkt_dir_t dir = (pkt_dir_t)pcap_node->get_mbuf_dir();
+ CCorePerPort *lp_port=&m_ports[dir];
+ CVirtualIFPerSideStats *lp_stats = &m_stats[dir];
+
+ send_pkt(lp_port, m, lp_stats);
+
+ return (0);
+}
+
+/**
+ * slow path code goes here
+ *
+ */
+int CCoreEthIFStateless::handle_slow_path_node(CGenNode * no) {
+
+ if (no->m_type == CGenNode::PCAP_PKT) {
+ return send_pcap_node((CGenNodePCAP *)no);
+ }
+
+ return (-1);
+}
int CCoreEthIF::send_node(CGenNode * node){
@@ -2649,6 +2686,10 @@ public:
}
int run_in_rx_core();
int run_in_master();
+
+ bool handle_fast_path();
+ bool handle_slow_path(bool &was_stopped);
+
int stop_master();
/* return the minimum number of dp cores needed to support the active ports
this is for c==1 or m_cores_mul==1
@@ -3693,125 +3734,156 @@ CGlobalTRex::publish_async_barrier(uint32_t key) {
m_zmq_publisher.publish_barrier(key);
}
-int CGlobalTRex::run_in_master() {
- bool was_stopped=false;
-
- if ( get_is_stateless() ) {
- m_trex_stateless->launch_control_plane();
- }
-
- /* exception and scope safe */
- std::unique_lock<std::mutex> cp_lock(m_cp_lock);
-
- while ( true ) {
- m_stats_cnt+=1;
+bool
+CGlobalTRex::handle_slow_path(bool &was_stopped) {
+ m_stats_cnt+=1;
- if ( CGlobalInfo::m_options.preview.get_no_keyboard() ==false ){
- if ( m_io_modes.handle_io_modes() ){
- was_stopped=true;
- break;
- }
- }
- if ( sanity_check() ){
- printf(" Test was stopped \n");
+ if ( CGlobalInfo::m_options.preview.get_no_keyboard() ==false ) {
+ if ( m_io_modes.handle_io_modes() ) {
was_stopped=true;
- break;
+ return false;
}
- if (m_io_modes.m_g_mode != CTrexGlobalIoMode::gDISABLE ) {
+ }
+
+ if ( sanity_check() ) {
+ printf(" Test was stopped \n");
+ was_stopped=true;
+ return false;
+ }
+ if (m_io_modes.m_g_mode != CTrexGlobalIoMode::gDISABLE ) {
+ fprintf(stdout,"\033[2J");
+ fprintf(stdout,"\033[2H");
+
+ } else {
+ if ( m_io_modes.m_g_disable_first ) {
+ m_io_modes.m_g_disable_first=false;
fprintf(stdout,"\033[2J");
fprintf(stdout,"\033[2H");
-
- }else{
- if ( m_io_modes.m_g_disable_first ){
- m_io_modes.m_g_disable_first=false;
- fprintf(stdout,"\033[2J");
- fprintf(stdout,"\033[2H");
- printf("clean !!!\n");
- fflush(stdout);
- }
+ printf("clean !!!\n");
+ fflush(stdout);
}
+ }
- if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gHELP ) {
- m_io_modes.DumpHelp(stdout);
- }
+ if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gHELP ) {
+ m_io_modes.DumpHelp(stdout);
+ }
- dump_stats(stdout,CGlobalStats::dmpTABLE);
+ dump_stats(stdout,CGlobalStats::dmpTABLE);
- if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) {
- fprintf (stdout," current time : %.1f sec \n",now_sec());
- float d= CGlobalInfo::m_options.m_duration - now_sec();
- if (d<0) {
- d=0;
+ if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) {
+ fprintf (stdout," current time : %.1f sec \n",now_sec());
+ float d= CGlobalInfo::m_options.m_duration - now_sec();
+ if (d<0) {
+ d=0;
- }
- fprintf (stdout," test duration : %.1f sec \n",d);
}
+ fprintf (stdout," test duration : %.1f sec \n",d);
+ }
- if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gMem) {
+ if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gMem) {
- if ( m_stats_cnt%4==0){
- fprintf (stdout," %s \n",CGlobalInfo::dump_pool_as_json().c_str());
- }
+ if ( m_stats_cnt%4==0) {
+ fprintf (stdout," %s \n",CGlobalInfo::dump_pool_as_json().c_str());
}
+ }
- if ( CGlobalInfo::m_options.is_rx_enabled() ){
- m_mg.update();
+ if ( CGlobalInfo::m_options.is_rx_enabled() ) {
+ m_mg.update();
+
+ if ( m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) {
+ switch (m_io_modes.m_l_mode) {
+ case CTrexGlobalIoMode::lDISABLE:
+ fprintf(stdout,"\n+Latency stats disabled \n");
+ break;
+ case CTrexGlobalIoMode::lENABLE:
+ fprintf(stdout,"\n-Latency stats enabled \n");
+ m_mg.DumpShort(stdout);
+ break;
+ case CTrexGlobalIoMode::lENABLE_Extended:
+ fprintf(stdout,"\n-Latency stats extended \n");
+ m_mg.Dump(stdout);
+ break;
+ }
- if ( m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ){
- switch (m_io_modes.m_l_mode) {
- case CTrexGlobalIoMode::lDISABLE:
- fprintf(stdout,"\n+Latency stats disabled \n");
+ if ( get_is_rx_check_mode() ) {
+
+ switch (m_io_modes.m_rc_mode) {
+ case CTrexGlobalIoMode::rcDISABLE:
+ fprintf(stdout,"\n+Rx Check stats disabled \n");
break;
- case CTrexGlobalIoMode::lENABLE:
- fprintf(stdout,"\n-Latency stats enabled \n");
- m_mg.DumpShort(stdout);
+ case CTrexGlobalIoMode::rcENABLE:
+ fprintf(stdout,"\n-Rx Check stats enabled \n");
+ m_mg.DumpShortRxCheck(stdout);
break;
- case CTrexGlobalIoMode::lENABLE_Extended:
- fprintf(stdout,"\n-Latency stats extended \n");
- m_mg.Dump(stdout);
+ case CTrexGlobalIoMode::rcENABLE_Extended:
+ fprintf(stdout,"\n-Rx Check stats enhanced \n");
+ m_mg.DumpRxCheck(stdout);
break;
}
- if ( get_is_rx_check_mode() ) {
-
- switch (m_io_modes.m_rc_mode) {
- case CTrexGlobalIoMode::rcDISABLE:
- fprintf(stdout,"\n+Rx Check stats disabled \n");
- break;
- case CTrexGlobalIoMode::rcENABLE:
- fprintf(stdout,"\n-Rx Check stats enabled \n");
- m_mg.DumpShortRxCheck(stdout);
- break;
- case CTrexGlobalIoMode::rcENABLE_Extended:
- fprintf(stdout,"\n-Rx Check stats enhanced \n");
- m_mg.DumpRxCheck(stdout);
- break;
- }
-
- }
-
}
+
}
+ }
+ /* publish data */
+ publish_async_data(false);
+ return true;
+}
- /* publish data */
- publish_async_data(false);
- /* check from messages from DP */
- check_for_dp_messages();
+bool
+CGlobalTRex::handle_fast_path() {
+ /* check from messages from DP */
+ check_for_dp_messages();
- cp_lock.unlock();
- delay(500);
- cp_lock.lock();
+ if ( is_all_cores_finished() ) {
+ return false;
+ }
+
+ return true;
+}
+
+int CGlobalTRex::run_in_master() {
+ bool was_stopped=false;
- if ( is_all_cores_finished() ) {
+ if ( get_is_stateless() ) {
+ m_trex_stateless->launch_control_plane();
+ }
+
+ /* exception and scope safe */
+ std::unique_lock<std::mutex> cp_lock(m_cp_lock);
+
+ uint32_t slow_path_counter = 0;
+
+ const int FASTPATH_DELAY_MS = 20;
+ const int SLOWPATH_DELAY_MS = 500;
+
+ while ( true ) {
+
+ /* fast path */
+ if (!handle_fast_path()) {
break;
}
+
+ /* slow path */
+ if (slow_path_counter >= SLOWPATH_DELAY_MS) {
+ if (!handle_slow_path(was_stopped)) {
+ break;
+ }
+ slow_path_counter = 0;
+ }
+
+
+ cp_lock.unlock();
+ delay(FASTPATH_DELAY_MS);
+ slow_path_counter += FASTPATH_DELAY_MS;
+ cp_lock.lock();
}
/* on exit release the lock */
diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h
index f8843758..1d283478 100644
--- a/src/publisher/trex_publisher.h
+++ b/src/publisher/trex_publisher.h
@@ -48,6 +48,7 @@ public:
EVENT_PORT_FINISHED_TX = 4,
EVENT_PORT_ACQUIRED = 5,
EVENT_PORT_RELEASED = 6,
+ EVENT_PORT_ERROR = 7,
EVENT_SERVER_STOPPED = 100,
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index f7a23188..27376fe4 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -451,3 +451,32 @@ TrexRpcPublishNow::_run(const Json::Value &params, Json::Value &result) {
return (TREX_RPC_CMD_OK);
}
+
+
+/**
+ * push a remote PCAP on a port
+ *
+ */
+trex_rpc_cmd_rc_e
+TrexRpcCmdPushRemote::_run(const Json::Value &params, Json::Value &result) {
+
+ uint8_t port_id = parse_port(params, result);
+ std::string pcap_filename = parse_string(params, "pcap_filename", result);
+ double ipg_usec = parse_double(params, "ipg_usec", result);
+ double speedup = parse_double(params, "speedup", result);
+ uint32_t count = parse_uint32(params, "count", result);
+ double duration = parse_double(params, "duration", result);
+
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ try {
+ port->push_remote(pcap_filename, ipg_usec, speedup, count, duration);
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ result["result"] = Json::objectValue;
+ return (TREX_RPC_CMD_OK);
+
+}
+
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index 428bdd7b..affa65c1 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -130,5 +130,7 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdUpdateTraffic, "update_traffic", 3, true, APIClass
TREX_RPC_CMD_DEFINE(TrexRpcCmdValidate, "validate", 2, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdPushRemote, "push_remote", 6, true, APIClass::API_CLASS_TYPE_CORE);
+
#endif /* __TREX_RPC_CMD_H__ */
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index 924503f2..7104792e 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -65,6 +65,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
register_command(new TrexRpcCmdRemoveRXFilters());
register_command(new TrexRpcCmdValidate());
+ register_command(new TrexRpcCmdPushRemote());
}
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp
index 1321a362..fc96e00a 100644
--- a/src/stateless/cp/trex_dp_port_events.cpp
+++ b/src/stateless/cp/trex_dp_port_events.cpp
@@ -78,6 +78,9 @@ protected:
virtual void on_event() {
/* do nothing */
}
+ virtual void on_error(int thread_id) {
+ /* do nothing */
+ }
};
void
@@ -105,14 +108,14 @@ TrexDpPortEvents::barrier() {
*
*/
void
-TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id) {
+TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id, bool status) {
TrexDpPortEvent *event = lookup(event_id);
/* event might have been deleted */
if (!event) {
return;
}
- bool done = event->on_core_reporting_in(thread_id);
+ bool done = event->on_core_reporting_in(thread_id, status);
if (done) {
destroy_event(event_id);
@@ -150,7 +153,7 @@ TrexDpPortEvent::init(TrexStatelessPort *port, int event_id, int timeout_ms) {
}
bool
-TrexDpPortEvent::on_core_reporting_in(int thread_id) {
+TrexDpPortEvent::on_core_reporting_in(int thread_id, bool status) {
/* mark sure no double signal */
if (m_signal.at(thread_id)) {
std::stringstream err;
@@ -163,6 +166,11 @@ TrexDpPortEvent::on_core_reporting_in(int thread_id) {
m_signal.at(thread_id) = true;
m_pending_cnt--;
+ /* if any core reported an error - mark as a failure */
+ if (!status) {
+ on_error(thread_id);
+ }
+
/* event occured */
if (m_pending_cnt == 0) {
on_event();
diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h
index 3b8c8633..681e47ab 100644
--- a/src/stateless/cp/trex_dp_port_events.h
+++ b/src/stateless/cp/trex_dp_port_events.h
@@ -48,13 +48,22 @@ protected:
*/
virtual void on_event() = 0;
+ /**
+ * when a thread ID encounter an error
+ *
+ * @author imarom (20-Apr-16)
+ *
+ * @param thread_id
+ */
+ virtual void on_error(int thread_id) = 0;
+
TrexStatelessPort *get_port() {
return m_port;
}
private:
void init(TrexStatelessPort *port, int event_id, int timeout_ms);
- bool on_core_reporting_in(int thread_id);
+ bool on_core_reporting_in(int thread_id, bool status = true);
std::unordered_map<int, bool> m_signal;
int m_pending_cnt;
@@ -98,7 +107,7 @@ public:
/**
* a core has reached the event
*/
- void on_core_reporting_in(int event_id, int thread_id);
+ void on_core_reporting_in(int event_id, int thread_id, bool status = true);
private:
TrexDpPortEvent *lookup(int event_id);
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 90142d9b..360cc7d6 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -24,6 +24,7 @@ limitations under the License.
#include <trex_stateless_messaging.h>
#include <trex_streams_compiler.h>
#include <common/basic_utils.h>
+#include <common/captureFile.h>
#include <string>
@@ -70,6 +71,20 @@ protected:
assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
}
+
+ /**
+ * when a DP core encountered an error
+ *
+ * @author imarom (20-Apr-16)
+ */
+ virtual void on_error(int thread_id) {
+ Json::Value data;
+
+ data["port_id"] = get_port()->get_port_id();
+ data["thread_id"] = thread_id;
+
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_ERROR, data);
+ }
};
/***************************
@@ -240,6 +255,13 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
}
+bool TrexStatelessPort::is_active() const {
+ return ( (m_port_state == PORT_STATE_TX)
+ || (m_port_state == PORT_STATE_PAUSE)
+ || (m_port_state == PORT_STATE_PCAP_TX)
+ );
+}
+
/**
* stop traffic on port
*
@@ -249,9 +271,7 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
*/
void
TrexStatelessPort::stop_traffic(void) {
-
- if (!( (m_port_state == PORT_STATE_TX)
- || (m_port_state == PORT_STATE_PAUSE) )) {
+ if (!is_active()) {
return;
}
@@ -395,6 +415,55 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
}
+void
+TrexStatelessPort::push_remote(const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration) {
+
+ /* command allowed only on state stream */
+ verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
+
+ /* check that file exists */
+ CCapReaderBase *reader;
+ std::stringstream ss;
+ reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
+ if (!reader) {
+ throw TrexException(ss.str());
+ }
+ delete reader;
+
+ /* only one core gets to play */
+ int tx_core = m_cores_id_list[0];
+
+ /* create async event */
+ assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
+ m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
+
+ /* mark all other cores as done */
+ for (int index = 1; index < m_cores_id_list.size(); index++) {
+ /* mimic an end event */
+ m_dp_events.on_core_reporting_in(m_pending_async_stop_event, m_cores_id_list[index]);
+ }
+
+ /* send a message to core */
+ change_state(PORT_STATE_PCAP_TX);
+ TrexStatelessCpToDpMsgBase *push_msg = new TrexStatelessDpPushPCAP(m_port_id,
+ m_pending_async_stop_event,
+ pcap_filename,
+ ipg_usec,
+ speedup,
+ count,
+ duration);
+ send_message_to_dp(tx_core, push_msg);
+
+ /* update subscribers */
+ Json::Value data;
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
+}
+
std::string
TrexStatelessPort::get_state_as_string() const {
@@ -413,6 +482,9 @@ TrexStatelessPort::get_state_as_string() const {
case PORT_STATE_PAUSE:
return "PAUSE";
+
+ case PORT_STATE_PCAP_TX:
+ return "PCAP_TX";
}
return "UNKNOWN";
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 520940d8..8856b429 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -129,11 +129,12 @@ public:
* port state
*/
enum port_state_e {
- PORT_STATE_DOWN = 0x1,
- PORT_STATE_IDLE = 0x2,
- PORT_STATE_STREAMS = 0x4,
- PORT_STATE_TX = 0x8,
- PORT_STATE_PAUSE = 0x10,
+ PORT_STATE_DOWN = 0x1,
+ PORT_STATE_IDLE = 0x2,
+ PORT_STATE_STREAMS = 0x4,
+ PORT_STATE_TX = 0x8,
+ PORT_STATE_PAUSE = 0x10,
+ PORT_STATE_PCAP_TX = 0x20,
};
/**
@@ -212,6 +213,16 @@ public:
void update_traffic(const TrexPortMultiplier &mul, bool force);
/**
+ * push a PCAP file onto the port
+ *
+ */
+ void push_remote(const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration);
+
+ /**
* get the port state
*
*/
@@ -220,6 +231,12 @@ public:
}
/**
+ * return true if the port is active
+ * (paused is considered active)
+ */
+ bool is_active() const;
+
+ /**
* port state as string
*
*/
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 13bf5a5d..c5963625 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -358,6 +358,51 @@ bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
return (true);
}
+bool TrexStatelessDpPerPort::push_pcap(uint8_t port_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count) {
+
+ /* push pcap can only happen on an idle port from the core prespective */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_IDLE);
+
+ CGenNodePCAP *pcap_node = m_core->allocate_pcap_node();
+ if (!pcap_node) {
+ return (false);
+ }
+
+ pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(port_id);
+ socket_id_t socket_id = m_core->m_node_gen.m_socket_id;
+
+ uint8_t mac_addr[12];
+ m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, mac_addr);
+
+ bool rc = pcap_node->create(port_id,
+ dir,
+ socket_id,
+ mac_addr,
+ pcap_filename,
+ ipg_usec,
+ speedup,
+ count);
+ if (!rc) {
+ m_core->free_node((CGenNode *)pcap_node);
+ return (false);
+ }
+
+ /* schedule the node for now */
+ pcap_node->m_time = m_core->m_cur_time_sec;
+ m_core->m_node_gen.add_node((CGenNode *)pcap_node);
+
+ /* hold a pointer to the node */
+ assert(m_active_pcap_node == NULL);
+ m_active_pcap_node = pcap_node;
+
+ m_state = TrexStatelessDpPerPort::ppSTATE_PCAP_TX;
+ return (true);
+}
+
bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
bool stop_on_id,
@@ -390,6 +435,19 @@ bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
}
}
+ /* check for active PCAP node */
+ if (m_active_pcap_node) {
+ /* when got async stop from outside or duration */
+ if (m_active_pcap_node->is_active()) {
+ m_active_pcap_node->mark_for_free();
+ } else {
+ /* graceful stop - node was put out by the scheduler */
+ m_core->free_node( (CGenNode *)m_active_pcap_node);
+ }
+
+ m_active_pcap_node = NULL;
+ }
+
/* active stream should be zero */
assert(m_active_streams==0);
m_active_nodes.clear();
@@ -401,9 +459,9 @@ bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){
m_core=core;
m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
- m_port_id=0;
m_active_streams=0;
m_active_nodes.clear();
+ m_active_pcap_node = NULL;
}
@@ -709,6 +767,7 @@ void
TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port,
TrexStream * stream,
TrexStreamsCompiledObj *comp) {
+
CGenNodeStateless *node = m_core->create_node_sl();
node->cache_mbuf_array_init();
@@ -977,6 +1036,42 @@ TrexStatelessDpCore::pause_traffic(uint8_t port_id){
lp_port->pause_traffic(port_id);
}
+
+void
+TrexStatelessDpCore::push_pcap(uint8_t port_id,
+ int event_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration) {
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ lp_port->set_event_id(event_id);
+
+ /* delegate the command to the port */
+ bool rc = lp_port->push_pcap(port_id, pcap_filename, ipg_usec, speedup, count);
+ if (!rc) {
+ /* report back that we stopped */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
+ TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
+ port_id,
+ event_id,
+ false);
+ ring->Enqueue((CGenNode *)event_msg);
+ return;
+ }
+
+
+ if (duration > 0.0) {
+ add_port_duration(duration, port_id, event_id);
+ }
+
+ m_state = TrexStatelessDpCore::STATE_PCAP_TX;
+}
+
+
void
TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
@@ -995,21 +1090,10 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id,
the scheduler invokes it, it will be free */
TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
-
if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
- /* nothing to do ! already stopped */
- //printf(" skip .. %f\n",m_core->m_cur_time_sec);
return;
}
- /* inform the control plane we stopped - this might be a async stop
- (streams ended)
- */
- #if 0
- if ( are_all_ports_idle() ) {
- /* just a place holder if we will need to do somthing in that case */
- }
- #endif
CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
@@ -1038,3 +1122,87 @@ TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
event_id);
ring->Enqueue((CGenNode *)event_msg);
}
+
+
+/**
+ * PCAP node
+ */
+bool CGenNodePCAP::create(uint8_t port_id,
+ pkt_dir_t dir,
+ socket_id_t socket_id,
+ const uint8_t *mac_addr,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count) {
+ std::stringstream ss;
+
+ m_type = CGenNode::PCAP_PKT;
+ m_flags = 0;
+ m_src_port = 0;
+ m_port_id = port_id;
+ m_count = count;
+
+ /* mark this node as slow path */
+ set_slow_path(true);
+
+ if (ipg_usec != -1) {
+ /* fixed IPG */
+ m_ipg_sec = usec_to_sec(ipg_usec / speedup);
+ m_speedup = 0;
+ } else {
+ /* packet IPG */
+ m_ipg_sec = -1;
+ m_speedup = speedup;
+ }
+
+ /* copy MAC addr info */
+ memcpy(m_mac_addr, mac_addr, 12);
+
+ /* set the dir */
+ set_mbuf_dir(dir);
+ set_socket_id(socket_id);
+
+ /* create the PCAP reader */
+ m_reader = CCapReaderFactory::CreateReader((char *)pcap_filename.c_str(), 0, ss);
+ if (!m_reader) {
+ return false;
+ }
+
+ m_raw_packet = new CCapPktRaw();
+ if ( m_reader->ReadPacket(m_raw_packet) == false ){
+ /* handle error */
+ delete m_reader;
+ return (false);
+ }
+
+ /* this is the reference time */
+ //m_base_time = m_raw_packet->get_time();
+ m_last_pkt_time = m_raw_packet->get_time();
+
+ /* ready */
+ m_state = PCAP_ACTIVE;
+
+ return true;
+}
+
+/**
+ * cleanup for PCAP node
+ *
+ * @author imarom (08-May-16)
+ */
+void CGenNodePCAP::destroy() {
+
+ if (m_raw_packet) {
+ delete m_raw_packet;
+ m_raw_packet = NULL;
+ }
+
+ if (m_reader) {
+ delete m_reader;
+ m_reader = NULL;
+ }
+
+ m_state = PCAP_INVALID;
+}
+
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index bdf84cfd..af2187ae 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -33,7 +33,7 @@ class CFlowGenListPerThread;
class CGenNodeStateless;
class TrexStreamsCompiledObj;
class TrexStream;
-
+class CGenNodePCAP;
class CDpOneStream {
public:
@@ -54,7 +54,8 @@ public:
enum state_e {
ppSTATE_IDLE,
ppSTATE_TRANSMITTING,
- ppSTATE_PAUSE
+ ppSTATE_PAUSE,
+ ppSTATE_PCAP_TX,
};
@@ -70,6 +71,12 @@ public:
bool update_traffic(uint8_t port_id, double factor);
+ bool push_pcap(uint8_t port_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count);
+
bool stop_traffic(uint8_t port_id,
bool stop_on_id,
int event_id);
@@ -91,11 +98,11 @@ public:
public:
state_e m_state;
- uint8_t m_port_id;
uint32_t m_active_streams; /* how many active streams on this port */
std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */
+ CGenNodePCAP *m_active_pcap_node;
CFlowGenListPerThread * m_core ;
int m_event_id;
};
@@ -113,6 +120,7 @@ public:
enum state_e {
STATE_IDLE,
STATE_TRANSMITTING,
+ STATE_PCAP_TX,
STATE_TERMINATE
};
@@ -151,7 +159,7 @@ public:
*/
void start_traffic(TrexStreamsCompiledObj *obj,
double duration,
- int m_event_id);
+ int event_id);
/* pause the streams, work only if all are continues */
@@ -163,6 +171,19 @@ public:
/**
+ * push a PCAP file on port
+ *
+ */
+ void push_pcap(uint8_t port_id,
+ int event_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration);
+
+
+ /**
* update current traffic rate
*
* @author imarom (25-Nov-15)
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index e9e5cf5b..b5395e78 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -26,6 +26,8 @@ limitations under the License.
#include <stdio.h>
class TrexStatelessDpCore;
+class TrexStatelessDpPerPort;
+
#include <trex_stream.h>
class TrexStatelessCpToDpMsgBase;
@@ -448,6 +450,176 @@ private:
static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)" );
+/* this is a event for PCAP transmitting */
+struct CGenNodePCAP : public CGenNodeBase {
+friend class TrexStatelessDpPerPort;
+
+public:
+
+ /**
+ * creates a node from a PCAP file
+ */
+ bool create(uint8_t port_id,
+ pkt_dir_t dir,
+ socket_id_t socket_id,
+ const uint8_t *mac_addr,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count);
+ /**
+ * destroy the node cleaning up any data
+ *
+ */
+ void destroy();
+
+ /**
+ * advance - will read the next packet
+ *
+ * @author imarom (03-May-16)
+ */
+ void next() {
+ assert(is_active());
+
+ /* save the previous packet time */
+ m_last_pkt_time = m_raw_packet->get_time();
+
+ /* advance */
+ if ( m_reader->ReadPacket(m_raw_packet) == false ){
+ m_count--;
+
+ /* if its the end - go home... */
+ if (m_count == 0) {
+ m_state = PCAP_INACTIVE;
+ return;
+ }
+
+ /* rewind and load the first packet */
+ m_reader->Rewind();
+ if (!m_reader->ReadPacket(m_raw_packet)) {
+ m_state = PCAP_INACTIVE;
+ return;
+ }
+ }
+
+ }
+
+ /**
+ * return the time for the next scheduling for a packet
+ *
+ */
+ inline double get_ipg() {
+ assert(m_state != PCAP_INVALID);
+
+ /* fixed IPG */
+ if (m_ipg_sec != -1) {
+ return m_ipg_sec;
+ } else {
+ return ((m_raw_packet->get_time() - m_last_pkt_time) / m_speedup);
+ }
+ }
+
+ /**
+ * get the current packet as MBUF
+ *
+ */
+ inline rte_mbuf_t *get_pkt() {
+ assert(m_state != PCAP_INVALID);
+
+ rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc( get_socket_id(), m_raw_packet->getTotalLen());
+ assert(m);
+
+ char *p = rte_pktmbuf_append(m, m_raw_packet->getTotalLen());
+ assert(p);
+
+ /* copy the packet */
+ memcpy(p, m_raw_packet->raw, m_raw_packet->getTotalLen());
+
+ /* fix the MAC */
+ memcpy(p, m_mac_addr, 12);
+
+ return (m);
+ }
+
+
+ inline void handle(CFlowGenListPerThread *thread) {
+ assert(m_state != PCAP_INVALID);
+ thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+
+ // read the next packet
+ next();
+
+ if (is_active()) {
+ m_time += get_ipg();
+ thread->m_node_gen.m_p_queue.push((CGenNode *)this);
+
+ } else {
+ thread->stop_stateless_traffic(get_port_id());
+ }
+ }
+
+ void set_mbuf_dir(pkt_dir_t dir) {
+ if (dir) {
+ m_flags |=NODE_FLAGS_DIR;
+ }else{
+ m_flags &=~NODE_FLAGS_DIR;
+ }
+ }
+
+ inline pkt_dir_t get_mbuf_dir(){
+ return ((pkt_dir_t)( m_flags &1));
+ }
+
+ uint8_t get_port_id() {
+ return m_port_id;
+ }
+
+ void mark_for_free() {
+ m_state = PCAP_MARKED_FOR_FREE;
+ }
+
+ bool is_active() {
+ return (m_state == PCAP_ACTIVE);
+ }
+
+ bool is_marked_for_free() {
+ return (m_state == PCAP_MARKED_FOR_FREE);
+ }
+
+private:
+
+ enum {
+ PCAP_INVALID = 0,
+ PCAP_ACTIVE,
+ PCAP_INACTIVE,
+ PCAP_MARKED_FOR_FREE
+ };
+
+ /* cache line 0 */
+ /* important stuff here */
+ uint8_t m_mac_addr[12];
+ uint8_t m_state;
+
+ double m_last_pkt_time;
+ double m_speedup;
+ double m_ipg_sec;
+ uint32_t m_count;
+
+ double m_next_time_offset; /* in sec */
+
+ CCapReaderBase *m_reader;
+ CCapPktRaw *m_raw_packet;
+
+ uint8_t m_port_id;
+
+ /* pad to match the size of CGenNode */
+ uint8_t m_pad_end[33];
+
+} __rte_cache_aligned;
+
+
+static_assert(sizeof(CGenNodePCAP) == sizeof(CGenNode), "sizeof(CGenNodePCAP) != sizeof(CGenNode)" );
#endif /* __TREX_STREAM_NODE_H__ */
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 7edf0f13..1cbacb6f 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -181,6 +181,36 @@ TrexStatelessDpUpdate::clone() {
return new_msg;
}
+
+/*************************
+ push PCAP message
+ ************************/
+bool
+TrexStatelessDpPushPCAP::handle(TrexStatelessDpCore *dp_core) {
+ dp_core->push_pcap(m_port_id,
+ m_event_id,
+ m_pcap_filename,
+ m_ipg_usec,
+ m_speedup,
+ m_count,
+ m_duration);
+ return true;
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpPushPCAP::clone() {
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpPushPCAP(m_port_id,
+ m_event_id,
+ m_pcap_filename,
+ m_ipg_usec,
+ m_speedup,
+ m_count,
+ m_duration);
+
+ return new_msg;
+}
+
+
/*************************
barrier message
************************/
@@ -203,7 +233,7 @@ TrexStatelessDpBarrier::clone() {
bool
TrexDpPortEventMsg::handle() {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id);
- port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id);
+ port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id, get_status());
return (true);
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 0eed01bd..9b1f2e31 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -246,6 +246,43 @@ private:
double m_factor;
};
+
+/**
+ * psuh a PCAP message
+ */
+class TrexStatelessDpPushPCAP : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpPushPCAP(uint8_t port_id,
+ int event_id,
+ const std::string &pcap_filename,
+ double ipg_usec,
+ double speedup,
+ uint32_t count,
+ double duration) : m_pcap_filename(pcap_filename) {
+ m_port_id = port_id;
+ m_event_id = event_id;
+ m_ipg_usec = ipg_usec;
+ m_speedup = speedup;
+ m_count = count;
+ m_duration = duration;
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+private:
+ std::string m_pcap_filename;
+ int m_event_id;
+ double m_ipg_usec;
+ double m_speedup;
+ double m_duration;
+ uint32_t m_count;
+ uint8_t m_port_id;
+};
+
+
/**
* barrier message for DP core
*
@@ -267,6 +304,7 @@ private:
int m_event_id;
};
+
/************************* messages from DP to CP **********************/
/**
@@ -303,10 +341,11 @@ public:
class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
public:
- TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id) {
- m_thread_id = thread_id;
- m_port_id = port_id;
- m_event_id = event_id;
+ TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id, bool status = true) {
+ m_thread_id = thread_id;
+ m_port_id = port_id;
+ m_event_id = event_id;
+ m_status = status;
}
virtual bool handle();
@@ -323,10 +362,15 @@ public:
return m_event_id;
}
+ bool get_status() {
+ return m_status;
+ }
+
private:
int m_thread_id;
uint8_t m_port_id;
int m_event_id;
+ bool m_status;
};