summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorDan Klein <danklein10@gmail.com>2015-11-24 08:54:53 +0200
committerDan Klein <danklein10@gmail.com>2015-11-24 08:54:53 +0200
commite7cb8b0f6c2fbe08d2086a7408040ac7d12aee5a (patch)
tree1b27e542fe9f3ae4abdc8245b804cda25a6e2c2f /src
parent597f74d8ed10abc3dd9df7e81ecea5ac2f5c714e (diff)
parentf3861d504353729724086dec82c79e818224554f (diff)
Merge branch 'master' into dan_stateless
Diffstat (limited to 'src')
-rwxr-xr-xsrc/bp_gtest.cpp35
-rwxr-xr-xsrc/bp_sim.cpp396
-rwxr-xr-xsrc/bp_sim.h95
-rwxr-xr-xsrc/common/Network/Packet/IPHeader.cpp2
-rwxr-xr-xsrc/common/Network/Packet/TCPHeader.cpp2
-rwxr-xr-xsrc/common/c_common.h2
-rw-r--r--src/gtest/rpc_test.cpp101
-rw-r--r--src/gtest/trex_stateless_gtest.cpp1761
-rwxr-xr-xsrc/gtest/tuple_gen_test.cpp8
-rw-r--r--src/internal_api/trex_platform_api.h4
-rwxr-xr-xsrc/main.cpp71
-rwxr-xr-xsrc/main_dpdk.cpp445
-rwxr-xr-xsrc/msg_manager.cpp26
-rwxr-xr-xsrc/msg_manager.h1
-rwxr-xr-xsrc/nat_check.cpp4
-rwxr-xr-xsrc/pal/linux/mbuf.cpp7
-rwxr-xr-xsrc/pal/linux/mbuf.h2
-rwxr-xr-xsrc/pal/linux_dpdk/mbuf.h4
-rwxr-xr-xsrc/platform_cfg.cpp8
-rwxr-xr-xsrc/platform_cfg.h2
-rw-r--r--src/publisher/trex_publisher.cpp107
-rw-r--r--src/publisher/trex_publisher.h54
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp32
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_stream.cpp234
-rw-r--r--src/rpc-server/commands/trex_rpc_cmds.h13
-rw-r--r--src/rpc-server/trex_rpc_cmds_table.cpp5
-rw-r--r--src/rpc-server/trex_rpc_req_resp_server.h2
-rwxr-xr-xsrc/rx_check.cpp24
-rwxr-xr-xsrc/rx_check.h7
-rwxr-xr-xsrc/rx_check_header.cpp10
-rw-r--r--src/stateless/cp/trex_dp_port_events.cpp220
-rw-r--r--src/stateless/cp/trex_dp_port_events.h171
-rw-r--r--src/stateless/cp/trex_stateless.cpp4
-rw-r--r--src/stateless/cp/trex_stateless.h9
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp239
-rw-r--r--src/stateless/cp/trex_stateless_port.h167
-rw-r--r--src/stateless/cp/trex_stream.cpp74
-rw-r--r--src/stateless/cp/trex_stream.h171
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp394
-rw-r--r--src/stateless/cp/trex_streams_compiler.h59
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp526
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h158
-rw-r--r--src/stateless/dp/trex_stream_node.h197
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp128
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h219
-rw-r--r--src/stub/trex_stateless_stub.cpp3
-rwxr-xr-xsrc/time_histogram.cpp6
-rwxr-xr-xsrc/tuple_gen.h3
-rwxr-xr-xsrc/utl_json.cpp4
-rwxr-xr-xsrc/utl_yaml.cpp2
50 files changed, 5315 insertions, 903 deletions
diff --git a/src/bp_gtest.cpp b/src/bp_gtest.cpp
index e3145f2a..a94c2d37 100755
--- a/src/bp_gtest.cpp
+++ b/src/bp_gtest.cpp
@@ -66,7 +66,7 @@ int test_priorty_queue(void){
int i;
for (i=0; i<10; i++) {
node = new CGenNode();
- printf(" +%x \n",node);
+ printf(" +%p \n",node);
node->m_flow_id = 10-i;
node->m_pkt_info = (CFlowPktInfo *)(uintptr_t)i;
node->m_time = (double)i+0.1;
@@ -74,7 +74,7 @@ int test_priorty_queue(void){
}
while (!p_queue.empty()) {
node = p_queue.top();
- printf(" -->%x \n",node);
+ printf(" -->%p \n",node);
//node->Dump(stdout);
p_queue.pop();
//delete node;
@@ -131,16 +131,6 @@ int test_human_p(){
-static bool was_init=false;
-
-void gtest_init_once(){
-
- if ( !was_init ){
- CGlobalInfo::init_pools(1000);
- time_init();
- was_init=true;
- }
-}
@@ -159,7 +149,7 @@ public:
bool init(void){
- uint16 * ports;
+ uint16 * ports = NULL;
CTupleBase tuple;
CErfIF erf_vif;
@@ -259,7 +249,6 @@ public:
class basic : public testing::Test {
protected:
virtual void SetUp() {
- gtest_init_once();
}
virtual void TearDown() {
}
@@ -269,7 +258,6 @@ public:
class cpu : public testing::Test {
protected:
virtual void SetUp() {
- gtest_init_once();
}
virtual void TearDown() {
}
@@ -663,6 +651,7 @@ TEST_F(basic, latency1) {
po->preview.setFileWrite(true);
uint8_t mac[]={0,0,0,1,0,0};
+ (void)mac;
CErfIF erf_vif;
erf_vif.set_review_mode(&CGlobalInfo::m_options.preview);
@@ -714,6 +703,7 @@ TEST_F(basic, latency2) {
uint8_t mac[]={0,0,0,1,0,0};
+ (void)mac;
mac[0]=0;
mac[1]=0;
@@ -728,14 +718,13 @@ TEST_F(basic, latency2) {
int i;
for (i=0; i<100; i++) {
- uint8_t *p;
rte_mbuf_t * m=l.generate_pkt(0);
- p=rte_pktmbuf_mtod(m, uint8_t*);
+ rte_pktmbuf_mtod(m, uint8_t*);
//utl_DumpBuffer(stdout,p,l.get_pkt_size(),0);
port0.update_packet(m);
- p=rte_pktmbuf_mtod(m, uint8_t*);
+ rte_pktmbuf_mtod(m, uint8_t*);
//utl_DumpBuffer(stdout,p,l.get_pkt_size(),0);
//printf("offset is : %d \n",l.get_payload_offset());
@@ -763,6 +752,7 @@ TEST_F(basic, latency3) {
uint8_t mac[]={0,0,0,1,0,0};
+ (void)mac;
mac[0]=0;
@@ -850,6 +840,7 @@ public:
TEST_F(basic, latency4) {
uint8_t mac[]={0,0,0,1,0,0};
+ (void)mac;
mac[0]=0;
mac[1]=0;
@@ -1196,7 +1187,6 @@ TEST_F(cpu, cpu3) {
class timerwl : public testing::Test {
protected:
virtual void SetUp() {
- gtest_init_once();
}
virtual void TearDown() {
}
@@ -1447,7 +1437,6 @@ TEST_F(timerwl, many_timers_with_stop) {
class rx_check : public testing::Test {
protected:
virtual void SetUp() {
- gtest_init_once();
m_rx_check.Create();
}
@@ -2125,7 +2114,7 @@ class CRxCheck1 : public CRxCheckCallbackBase {
public:
virtual void handle_packet(rte_mbuf_t * m){
- char *mp=rte_pktmbuf_mtod(m, char*);
+ rte_pktmbuf_mtod(m, char*);
CRx_check_header * rx_p;
rte_mbuf_t * m2 = m->next;
rx_p=(CRx_check_header *)rte_pktmbuf_mtod(m2, char*);
@@ -2139,7 +2128,6 @@ public:
class rx_check_system : public testing::Test {
protected:
virtual void SetUp() {
- gtest_init_once();
m_rx_check.m_callback=&m_callback;
m_callback.mg =&m_mg;
@@ -2417,8 +2405,6 @@ public:
class nat_check_system : public testing::Test {
protected:
virtual void SetUp() {
- gtest_init_once();
-
m_rx_check.m_callback=&m_callback;
m_callback.mg =&m_mg;
m_mg.Create();
@@ -2464,7 +2450,6 @@ class file_flow_info : public testing::Test {
protected:
virtual void SetUp() {
- gtest_init_once();
assert(m_flow_info.Create());
}
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
index 92beab91..a61fbb8f 100755
--- a/src/bp_sim.cpp
+++ b/src/bp_sim.cpp
@@ -26,6 +26,7 @@ limitations under the License.
#include <common/basic_utils.h>
#include <trex_stream_node.h>
+#include <trex_stateless_messaging.h>
#undef VALG
@@ -72,11 +73,11 @@ void CGlobalMemory::Dump(FILE *fd){
c_size=c_size*2;
}
- fprintf(fd," %-40s : %lu \n",names[i].c_str(),m_mbuf[i]);
+ fprintf(fd," %-40s : %lu \n",names[i].c_str(),(ulong)m_mbuf[i]);
}
c_total += (m_mbuf[MBUF_DP_FLOWS] * sizeof(CGenNode));
- fprintf(fd," %-40s : %lu \n","get_each_core_dp_flows",get_each_core_dp_flows());
+ fprintf(fd," %-40s : %lu \n","get_each_core_dp_flows",(ulong)get_each_core_dp_flows());
fprintf(fd," %-40s : %s \n","Total memory",double_to_human_str(c_total,"bytes",KBYE_1024).c_str() );
}
@@ -218,7 +219,7 @@ bool CPlatformSocketInfoConfig::init(){
m_max_threads_per_dual_if = num_threads;
}else{
if (lp->m_threads.size() != num_threads) {
- printf("ERROR number of threads per dual ports should be the same for all dual ports\n");
+ printf("ERROR, the number of threads per dual ports should be the same for all dual ports\n");
exit(1);
}
}
@@ -230,7 +231,7 @@ bool CPlatformSocketInfoConfig::init(){
uint8_t phy_thread = lp->m_threads[j];
if (phy_thread>MAX_THREADS_SUPPORTED) {
- printf("ERROR physical thread id is %d higher than max %d \n",phy_thread,MAX_THREADS_SUPPORTED);
+ printf("ERROR, physical thread id is %d higher than max %d \n",phy_thread,MAX_THREADS_SUPPORTED);
exit(1);
}
@@ -240,7 +241,7 @@ bool CPlatformSocketInfoConfig::init(){
}
if ( m_thread_phy_to_virtual[phy_thread] ){
- printf("ERROR physical thread %d defined twice %d \n",phy_thread);
+ printf("ERROR physical thread %d defined twice\n",phy_thread);
exit(1);
}
m_thread_phy_to_virtual[phy_thread]=virt_thread;
@@ -269,7 +270,7 @@ bool CPlatformSocketInfoConfig::init(){
void CPlatformSocketInfoConfig::dump(FILE *fd){
- fprintf(fd," core_mask %x \n",get_cores_mask());
+ fprintf(fd," core_mask %llx \n",(unsigned long long)get_cores_mask());
fprintf(fd," sockets :");
int i;
for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
@@ -478,8 +479,8 @@ void CPlatformSocketInfo::dump(FILE *fd){
void CRteMemPool::dump_in_case_of_error(FILE *fd){
- fprintf(fd," ERROR ERROR there is no enough memory in socket %d \n",m_pool_id);
- fprintf(fd," Try to enlarge the memory values in the configuration file /etc/trex_cfg.yaml \n");
+ fprintf(fd," ERROR,there is no enough memory in socket %d \n",m_pool_id);
+ fprintf(fd," Try to enlarge the memory values in the configuration file -/etc/trex_cfg.yaml ,see manual for more detail \n");
dump(fd);
}
@@ -496,6 +497,26 @@ void CRteMemPool::dump(FILE *fd){
}
////////////////////////////////////////
+
+void CGlobalInfo::free_pools(){
+ CPlatformSocketInfo * lpSocket =&m_socket;
+ CRteMemPool * lpmem;
+ int i;
+ for (i=0; i<(int)MAX_SOCKETS_SUPPORTED; i++) {
+ if (lpSocket->is_sockets_enable((socket_id_t)i)) {
+ lpmem= &m_mem_pool[i];
+ utl_rte_mempool_delete(lpmem->m_big_mbuf_pool);
+ utl_rte_mempool_delete(lpmem->m_small_mbuf_pool);
+ utl_rte_mempool_delete(lpmem->m_mbuf_pool_128);
+ utl_rte_mempool_delete(lpmem->m_mbuf_pool_256);
+ utl_rte_mempool_delete(lpmem->m_mbuf_pool_512);
+ utl_rte_mempool_delete(lpmem->m_mbuf_pool_1024);
+ }
+ utl_rte_mempool_delete(m_mem_pool[0].m_mbuf_global_nodes);
+ }
+}
+
+
void CGlobalInfo::init_pools(uint32_t rx_buffers){
/* this include the pkt from 64- */
CGlobalMemory * lp=&CGlobalInfo::m_memory_cfg;
@@ -748,9 +769,7 @@ int CErfIF::write_pkt(CCapPktRaw *pkt_raw){
int CErfIF::close_file(void){
BP_ASSERT(m_raw);
- m_raw->raw=0;
delete m_raw;
-
if ( m_preview_mode->getFileWrite() ){
BP_ASSERT(m_writer);
delete m_writer;
@@ -821,7 +840,6 @@ void CPacketIndication::UpdatePacketPadding(){
void CPacketIndication::RefreshPointers(){
char *pobase=getBasePtr();
- CPacketIndication * obj=this;
m_ether = (EthernetHeader *) (pobase + m_ether_offset);
l3.m_ipv4 = (IPHeader *) (pobase + m_ip_offset);
@@ -1672,7 +1690,6 @@ char * CFlowPktInfo::push_ipv4_option_offline(uint8_t bytes){
void CFlowPktInfo::mask_as_learn(){
- char *p;
CNatOption *lpNat;
if ( m_pkt_indication.is_ipv6() ){
lpNat=(CNatOption *)push_ipv6_option_offline(CNatOption::noOPTION_LEN);
@@ -2266,7 +2283,7 @@ void CCCapFileMemoryUsage::dump(FILE *fd){
int c_total=0;
for (i=0; i<CCCapFileMemoryUsage::MASK_SIZE; i++) {
- fprintf(fd," size_%-7d : %lu \n",c_size,m_buf[i]);
+ fprintf(fd," size_%-7d : %lu \n",c_size, (ulong)m_buf[i]);
c_total +=m_buf[i]*c_size;
c_size = c_size*2;
}
@@ -2441,7 +2458,6 @@ void operator >> (const YAML::Node& node, CFlowYamlInfo & fi) {
if ( node.FindValue("dyn_pyload") ){
- int i;
const YAML::Node& dyn_pyload = node["dyn_pyload"];
for(unsigned i=0;i<dyn_pyload.size();i++) {
CFlowYamlDpPkt fd;
@@ -2840,8 +2856,20 @@ void CFlowStats::DumpHeader(FILE *fd){
void CFlowStats::Dump(FILE *fd){
//"name","cps","f-pkts","f-bytes","Mb/sec","MB/sec","c-flows","PPS","total-Mbytes-duration","errors","flows"
fprintf(fd," %02d, %-40s ,%4.2f,%4.2f, %5.0f , %7.0f ,%7.2f ,%7.2f , %7.2f , %10.0f , %5.0f , %7.0f , %llu , %llu \n",
- m_id,m_name.c_str(),m_cps,get_normal_cps(),
- m_pkt,m_bytes,duration_sec,m_mb_sec,m_mB_sec,m_c_flows,m_pps,m_total_Mbytes,m_errors,m_flows);
+ m_id,
+ m_name.c_str(),
+ m_cps,
+ get_normal_cps(),
+ m_pkt,
+ m_bytes,
+ duration_sec,
+ m_mb_sec,
+ m_mB_sec,
+ m_c_flows,
+ m_pps,
+ m_total_Mbytes,
+ (unsigned long long)m_errors,
+ (unsigned long long)m_flows);
}
bool CFlowGeneratorRecPerThread::Create(CTupleGeneratorSmart * global_gen,
@@ -3045,22 +3073,31 @@ void CGenNode::DumpHeader(FILE *fd){
fprintf(fd," pkt_id,time,fid,pkt_info,pkt,len,type,is_init,is_last,type,thread_id,src_ip,dest_ip,src_port \n");
}
+
+void CGenNode::free_gen_node(){
+ rte_mbuf_t * m=get_cache_mbuf();
+ if ( unlikely(m != NULL) ) {
+ rte_pktmbuf_free(m);
+ m_plugin_info=0;
+ }
+}
+
+
void CGenNode::Dump(FILE *fd){
- fprintf(fd,"%.6f,%llx,%p,%llu,%d,%d,%d,%d,%d,%d,%x,%x,%d\n",m_time,m_flow_id,m_pkt_info,
- m_pkt_info->m_pkt_indication.m_packet->pkt_cnt,
- m_pkt_info->m_pkt_indication.m_packet->pkt_len,
- m_pkt_info->m_pkt_indication.m_desc.getId(),
- (m_pkt_info->m_pkt_indication.m_desc.IsInitSide()?1:0),
- m_pkt_info->m_pkt_indication.m_desc.IsLastPkt(),
+ fprintf(fd,"%.6f,%llx,%p,%llu,%d,%d,%d,%d,%d,%d,%x,%x,%d\n",
+ m_time,
+ (unsigned long long)m_flow_id,
+ m_pkt_info,
+ (unsigned long long)m_pkt_info->m_pkt_indication.m_packet->pkt_cnt,
+ m_pkt_info->m_pkt_indication.m_packet->pkt_len,
+ m_pkt_info->m_pkt_indication.m_desc.getId(),
+ (m_pkt_info->m_pkt_indication.m_desc.IsInitSide()?1:0),
+ m_pkt_info->m_pkt_indication.m_desc.IsLastPkt(),
m_type,
m_thread_id,
m_src_ip,
m_dest_ip,
- m_src_port
-
-
-
- );
+ m_src_port);
}
@@ -3093,6 +3130,13 @@ void CNodeGenerator::remove_all(CFlowGenListPerThread * thread){
while (!m_p_queue.empty()) {
node = m_p_queue.top();
m_p_queue.pop();
+ /* sanity check */
+ if (node->m_type == CGenNode::STATELESS_PKT) {
+ CGenNodeStateless * p=(CGenNodeStateless *)node;
+ /* need to be changed in Pause support */
+ assert(p->is_mask_for_free());
+ }
+
thread->free_node( node);
}
}
@@ -3108,6 +3152,7 @@ int CNodeGenerator::open_file(std::string file_name,
return (0);
}
+
int CNodeGenerator::close_file(CFlowGenListPerThread * thread){
remove_all(thread);
BP_ASSERT(m_v_if);
@@ -3115,10 +3160,19 @@ int CNodeGenerator::close_file(CFlowGenListPerThread * thread){
return (0);
}
+int CNodeGenerator::update_stl_stats(CGenNodeStateless *node_sl){
+ if ( m_preview_mode.getVMode() >2 ){
+ fprintf(stdout," %4lu ,", (ulong)m_cnt);
+ node_sl->Dump(stdout);
+ m_cnt++;
+ }
+ return (0);
+}
+
int CNodeGenerator::update_stats(CGenNode * node){
if ( m_preview_mode.getVMode() >2 ){
- fprintf(stdout," %llu ,",m_cnt);
+ fprintf(stdout," %llu ,", (unsigned long long)m_cnt);
node->Dump(stdout);
m_cnt++;
}
@@ -3132,6 +3186,8 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id,
uint32_t max_threads){
+ m_non_active_nodes = 0;
+ m_terminated_by_master=false;
m_flow_list =flow_list;
m_core_id= core_id;
m_tcp_dpc= 0;
@@ -3203,7 +3259,7 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id,
assert(m_ring_to_rx);
/* create the info required for stateless DP core */
- m_stateless_dp_info = new TrexStatelessDpCore(thread_id, this);
+ m_stateless_dp_info.create(thread_id, this);
return (true);
}
@@ -3353,8 +3409,7 @@ void CFlowGenListPerThread::Delete(){
Clean();
m_cpu_cp_u.Delete();
- delete m_stateless_dp_info;
- m_stateless_dp_info = NULL;
+ utl_rte_mempool_delete(m_node_pool);
}
@@ -3401,15 +3456,24 @@ int CNodeGenerator::flush_file(dsec_t max_time,
bool done=false;
thread->m_cpu_dp_u.start_work();
- while (!m_p_queue.empty()) {
+
+ /**
+ * if a positive value was given to max time
+ * schedule an exit node
+ */
+ if ( (max_time > 0) && (!always) ) {
+ CGenNode *exit_node = thread->create_node();
+
+ exit_node->m_type = CGenNode::EXIT_SCHED;
+ exit_node->m_time = max_time;
+ add_node(exit_node);
+ }
+
+ while (true) {
+
node = m_p_queue.top();
- n_time = node->m_time+ offset;
+ n_time = node->m_time + offset;
- if (( (n_time) > max_time ) &&
- (always==false) ) {
- /* nothing to do */
- break;
- }
events++;
/*#ifdef VALG
if (events > 1 ) {
@@ -3421,8 +3485,6 @@ int CNodeGenerator::flush_file(dsec_t max_time,
dsec_t dt ;
thread->m_cpu_dp_u.commit();
- thread->check_msgs();
-
while ( true ) {
dt = now_sec() - n_time ;
@@ -3449,9 +3511,9 @@ int CNodeGenerator::flush_file(dsec_t max_time,
}
}
- #ifndef RTE_DPDK
- thread->check_msgs();
- #endif
+ //#ifndef RTE_DPDK
+ //thread->check_msgs();
+ //#endif
uint8_t type=node->m_type;
@@ -3459,8 +3521,12 @@ int CNodeGenerator::flush_file(dsec_t max_time,
m_p_queue.pop();
CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
+ #ifdef _DEBUG
+ update_stl_stats(node_sl);
+ #endif
+
/* if the stream has been deactivated - end */
- if (unlikely(!node_sl->is_active())) {
+ if ( unlikely( node_sl->is_mask_for_free() ) ) {
thread->free_node(node);
} else {
node_sl->handle(thread);
@@ -3509,12 +3575,18 @@ int CNodeGenerator::flush_file(dsec_t max_time,
}
}else{
- handle_slow_messages(type,node,thread,always);
+ bool exit_sccheduler = handle_slow_messages(type,node,thread,always);
+ if (exit_sccheduler) {
+ break;
+ }
}
}
}
}
+ if ( thread->is_terminated_by_master() ) {
+ return (0);
+ }
if (!always) {
old_offset =offset;
@@ -3525,10 +3597,14 @@ int CNodeGenerator::flush_file(dsec_t max_time,
return (0);
}
-void CNodeGenerator::handle_slow_messages(uint8_t type,
- CGenNode * node,
- CFlowGenListPerThread * thread,
- bool always){
+bool
+CNodeGenerator::handle_slow_messages(uint8_t type,
+ CGenNode * node,
+ CFlowGenListPerThread * thread,
+ bool always){
+
+ /* should we continue after */
+ bool exit_scheduler = false;
if (unlikely (type == CGenNode::FLOW_DEFER_PORT_RELEASE) ) {
m_p_queue.pop();
@@ -3549,7 +3625,7 @@ void CNodeGenerator::handle_slow_messages(uint8_t type,
m_p_queue.pop();
/* time out, need to free the flow and remove the association , we didn't get convertion yet*/
thread->terminate_nat_flows(node);
- return;
+ return (exit_scheduler);
}else{
flush_one_node_to_file(node);
@@ -3569,28 +3645,50 @@ void CNodeGenerator::handle_slow_messages(uint8_t type,
m_p_queue.push(node);
}
- } else if ( type == CGenNode::FLOW_SYNC ) {
+ } else if ( type == CGenNode::FLOW_SYNC ) {
+ /* flow sync message is a sync point for time */
+ thread->m_cur_time_sec = node->m_time;
+
+ /* first pop the node */
m_p_queue.pop();
thread->check_msgs(); /* check messages */
m_v_if->flush_tx_queue(); /* flush pkt each timeout */
- if (always == false) {
+ /* exit in case this is the last node*/
+ if ( m_p_queue.size() == m_parent->m_non_active_nodes ) {
+ thread->free_node(node);
+ exit_scheduler = true;
+ } else {
+ /* schedule for next maintenace */
node->m_time += SYNC_TIME_OUT;
m_p_queue.push(node);
- }else{
- thread->free_node(node);
}
- /* must be the last section of processing */
- } else if ( type == CGenNode::EXIT_SCHED ) {
- remove_all(thread);
+ } else if ( type == CGenNode::EXIT_SCHED ) {
+ m_p_queue.pop();
+ thread->free_node(node);
+ exit_scheduler = true;
+
} else {
- printf(" ERROR type is not valid %d \n",type);
- assert(0);
+ if ( type == CGenNode::COMMAND) {
+ m_p_queue.pop();
+ CGenNodeCommand *node_cmd = (CGenNodeCommand *)node;
+ {
+ TrexStatelessCpToDpMsgBase * cmd=node_cmd->m_cmd;
+ cmd->handle(&thread->m_stateless_dp_info);
+ exit_scheduler = cmd->is_quit();
+ thread->free_node((CGenNode *)node_cmd);/* free the node */
+ }
+ }else{
+ printf(" ERROR type is not valid %d \n",type);
+ assert(0);
+ }
}
+
+ return exit_scheduler;
}
@@ -3829,7 +3927,7 @@ void CFlowGenListPerThread::handel_nat_msg(CGenNodeNatInfo * msg){
void CFlowGenListPerThread::check_msgs(void) {
/* inlined for performance */
- m_stateless_dp_info->periodic_check_for_cp_messages();
+ m_stateless_dp_info.periodic_check_for_cp_messages();
if ( likely ( m_ring_from_rx->isEmpty() ) ) {
return;
@@ -3868,44 +3966,43 @@ void CFlowGenListPerThread::check_msgs(void) {
}
}
-void delay(int msec);
+//void delay(int msec);
-const uint8_t test_udp_pkt[]={
- 0x00,0x00,0x00,0x01,0x00,0x00,
- 0x00,0x00,0x00,0x01,0x00,0x00,
- 0x08,0x00,
- 0x45,0x00,0x00,0x81,
- 0xaf,0x7e,0x00,0x00,
- 0x12,0x11,0xd9,0x23,
- 0x01,0x01,0x01,0x01,
- 0x3d,0xad,0x72,0x1b,
-
- 0x11,0x11,
- 0x11,0x11,
-
- 0x00,0x6d,
- 0x00,0x00,
-
- 0x64,0x31,0x3a,0x61,
- 0x64,0x32,0x3a,0x69,0x64,
- 0x32,0x30,0x3a,0xd0,0x0e,
- 0xa1,0x4b,0x7b,0xbd,0xbd,
- 0x16,0xc6,0xdb,0xc4,0xbb,0x43,
- 0xf9,0x4b,0x51,0x68,0x33,0x72,
- 0x20,0x39,0x3a,0x69,0x6e,0x66,0x6f,
- 0x5f,0x68,0x61,0x73,0x68,0x32,0x30,0x3a,0xee,0xc6,0xa3,
- 0xd3,0x13,0xa8,0x43,0x06,0x03,0xd8,0x9e,0x3f,0x67,0x6f,
- 0xe7,0x0a,0xfd,0x18,0x13,0x8d,0x65,0x31,0x3a,0x71,0x39,
- 0x3a,0x67,0x65,0x74,0x5f,0x70,0x65,0x65,0x72,0x73,0x31,
- 0x3a,0x74,0x38,0x3a,0x3d,0xeb,0x0c,0xbf,0x0d,0x6a,0x0d,
- 0xa5,0x31,0x3a,0x79,0x31,0x3a,0x71,0x65,0x87,0xa6,0x7d,
- 0xe7
-};
+void CFlowGenListPerThread::start_stateless_simulation_file(std::string erf_file_name,
+ CPreviewMode &preview){
+ m_preview_mode = preview;
+ m_node_gen.open_file(erf_file_name,&m_preview_mode);
+}
+
+void CFlowGenListPerThread::stop_stateless_simulation_file(){
+ m_node_gen.m_v_if->close_file();
+}
+
+void CFlowGenListPerThread::start_stateless_daemon_simulation(){
-void CFlowGenListPerThread::start_stateless_daemon(){
- m_stateless_dp_info->start();
+ m_cur_time_sec = 0;
+ m_stateless_dp_info.run_once();
+
+}
+
+
+/* return true if we need to shedule next_stream, */
+
+bool CFlowGenListPerThread::set_stateless_next_node( CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node){
+ return ( m_stateless_dp_info.set_stateless_next_node(cur_node,next_node) );
+}
+
+
+void CFlowGenListPerThread::start_stateless_daemon(CPreviewMode &preview){
+ m_cur_time_sec = 0;
+ /* set per thread global info, for performance */
+ m_preview_mode = preview;
+ m_node_gen.open_file("",&m_preview_mode);
+
+ m_stateless_dp_info.start();
}
@@ -3951,11 +4048,13 @@ void CFlowGenListPerThread::start_generate_stateful(std::string erf_file_name,
#endif
m_node_gen.flush_file(c_stop_sec,d_time_flow, false,this,old_offset);
+
+
#ifdef VALG
CALLGRIND_STOP_INSTRUMENTATION;
printf (" %llu \n",os_get_hr_tick_64()-_start_time);
#endif
- if ( !CGlobalInfo::m_options.preview.getNoCleanFlowClose() ){
+ if ( !CGlobalInfo::m_options.preview.getNoCleanFlowClose() && (is_terminated_by_master()==false) ){
/* clean close */
m_node_gen.flush_file(m_cur_time_sec, d_time_flow, true,this,old_offset);
}
@@ -3970,6 +4069,12 @@ void CFlowGenListPerThread::start_generate_stateful(std::string erf_file_name,
m_node_gen.close_file(this);
}
+void CFlowGenList::Delete(){
+ clean_p_thread_info();
+ Clean();
+ delete CPluginCallback::callback;
+}
+
bool CFlowGenList::Create(){
check_objects_sizes();
@@ -4001,10 +4106,6 @@ void CFlowGenList::clean_p_thread_info(void){
}
-void CFlowGenList::Delete(){
- clean_p_thread_info();
- Clean();
-}
int CFlowGenList::load_from_mac_file(std::string file_name) {
if ( !utl_is_file_exists (file_name) ){
@@ -4499,9 +4600,12 @@ void CTupleTemplateGenerator::Generate(){
#endif
+static uint32_t get_rand_32(uint32_t MinimumRange,
+ uint32_t MaximumRange) __attribute__ ((unused));
+
+static uint32_t get_rand_32(uint32_t MinimumRange,
+ uint32_t MaximumRange) {
-static uint32_t get_rand_32(uint32_t MinimumRange ,
- uint32_t MaximumRange ){
enum {RANDS_NUM = 2 , RAND_MAX_BITS = 0xf , UNSIGNED_INT_BITS = 0x20 , TWO_BITS_MASK = 0x3};
const double TWO_POWER_32_BITS = 0x10000000 * (double)0x10;
@@ -4535,24 +4639,54 @@ int CNullIF::send_node(CGenNode * node){
}
-int CErfIF::send_node(CGenNode * node){
- if ( m_preview_mode->getFileWrite() ){
- CFlowPktInfo * lp=node->m_pkt_info;
- rte_mbuf_t * m=lp->generate_new_mbuf(node);
+void CErfIF::fill_raw_packet(rte_mbuf_t * m,CGenNode * node,pkt_dir_t dir){
fill_pkt(m_raw,m);
+
CPktNsecTimeStamp t_c(node->m_time);
m_raw->time_nsec = t_c.m_time_nsec;
m_raw->time_sec = t_c.m_time_sec;
-
- pkt_dir_t dir=node->cur_interface_dir();
uint8_t p_id = (uint8_t)dir;
-
m_raw->setInterface(p_id);
+}
+
+
+int CErfIFStl::send_node(CGenNode * _no_to_use){
+
+ if ( m_preview_mode->getFileWrite() ){
+
+ CGenNodeStateless * node_sl=(CGenNodeStateless *) _no_to_use;
+
+ /* check that we have mbuf */
+ rte_mbuf_t * m=node_sl->get_cache_mbuf();
+ assert( m );
+ pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir();
+
+ fill_raw_packet(m,_no_to_use,dir);
+ BP_ASSERT(m_writer);
+ bool res=m_writer->write_packet(m_raw);
+
+
+ BP_ASSERT(res);
+ }
+ return (0);
+}
+
+
+int CErfIF::send_node(CGenNode * node){
+
+ if ( m_preview_mode->getFileWrite() ){
+
+ CFlowPktInfo * lp=node->m_pkt_info;
+ rte_mbuf_t * m=lp->generate_new_mbuf(node);
+ pkt_dir_t dir=node->cur_interface_dir();
+
+ fill_raw_packet(m,node,dir);
/* update mac addr dest/src 12 bytes */
uint8_t *p=(uint8_t *)m_raw->raw;
+ int p_id=(int)dir;
memcpy(p,CGlobalInfo::m_options.get_dst_src_mac_addr(p_id),12);
/* If vlan is enabled, add vlan header */
@@ -4734,7 +4868,6 @@ void CCPortLatency::reset(){
static uint8_t nat_is_port_can_send(uint8_t port_id){
- uint8_t offset= ((port_id>>1)<<1);
uint8_t client_index = (port_id %2);
return (client_index ==0 ?1:0);
}
@@ -4856,7 +4989,7 @@ void CCPortLatency::dump_counters_json(std::string & json ){
}
void CCPortLatency::DumpCounters(FILE *fd){
- #define DP_A1(f) if (f) fprintf(fd," %-40s : %llu \n",#f,f)
+ #define DP_A1(f) if (f) fprintf(fd," %-40s : %llu \n",#f, (unsigned long long)f)
fprintf(fd," counter \n");
fprintf(fd," -----------\n");
@@ -4875,7 +5008,7 @@ void CCPortLatency::DumpCounters(FILE *fd){
fprintf(fd," -----------\n");
m_hist.Dump(fd);
- fprintf(fd," %-40s : %llu \n","jitter",get_jitter_usec());
+ fprintf(fd," %-40s : %lu \n","jitter", (ulong)get_jitter_usec());
}
bool CCPortLatency::dump_packet(rte_mbuf_t * m){
@@ -4899,6 +5032,9 @@ bool CCPortLatency::dump_packet(rte_mbuf_t * m){
if ( unlikely( CGlobalInfo::m_options.preview.get_vlan_mode_enable() ) ){
vlan_offset=4;
}
+
+ (void)vlan_offset;
+
// utl_DumpBuffer(stdout,p,pkt_size,0);
return (0);
@@ -4934,9 +5070,7 @@ bool CCPortLatency::check_packet(rte_mbuf_t * m,CRx_check_header * & rx_p){
uint16_t vlan_offset=parser.m_vlan_offset;
uint8_t *p=rte_pktmbuf_mtod(m, uint8_t*);
- rx_p=(CRx_check_header *)0;
- bool managed_by_ip_options=false;
- bool is_rx_check=true;
+ rx_p = (CRx_check_header *)0;
if ( !parser.IsLatencyPkt() ){
@@ -5053,7 +5187,7 @@ void CLatencyManager::Delete(){
static uint8_t swap_port(uint8_t port_id){
uint8_t offset= ((port_id>>1)<<1);
uint8_t client_index = (port_id %2);
- return (offset+client_index^1);
+ return (offset + (client_index ^ 1));
}
@@ -5176,8 +5310,6 @@ void CLatencyManager::run_rx_queue_msgs(uint8_t thread_id,
CGenNodeMsgBase * msg=(CGenNodeMsgBase *)node;
- CGenNodeLatencyPktInfo * msg1=(CGenNodeLatencyPktInfo *)msg;
-
uint8_t msg_type = msg->m_msg_type;
switch (msg_type ) {
case CGenNodeMsgBase::LATENCY_PKT:
@@ -5300,7 +5432,7 @@ void CLatencyManager::start(int iter){
}
if ( iter>0 ){
if ( ( cnt>iter) ){
- printf("stop due iter %d %d \n",iter);
+ printf("stop due iter %d\n",iter);
break;
}
}
@@ -5469,8 +5601,8 @@ void CLatencyManager::DumpRxCheckVerification(FILE *fd,
fprintf(fd," rx_checker is disabled \n");
return;
}
- fprintf(fd," rx_check Tx : %u \n",total_tx_rx_check);
- fprintf(fd," rx_check Rx : %u \n",m_rx_check_manager.getTotalRx() );
+ fprintf(fd," rx_check Tx : %llu \n", (unsigned long long)total_tx_rx_check);
+ fprintf(fd," rx_check Rx : %llu \n", (unsigned long long)m_rx_check_manager.getTotalRx() );
fprintf(fd," rx_check verification :" );
if (m_rx_check_manager.getTotalRx() == total_tx_rx_check) {
fprintf(fd," OK \n" );
@@ -6723,7 +6855,6 @@ bool CSimplePacketParser::Parse(){
EthernetHeader *m_ether = (EthernetHeader *)p;
IPHeader * ipv4=0;
IPv6Header * ipv6=0;
- uint16_t pkt_size=rte_pktmbuf_pkt_len(m);
m_vlan_offset=0;
m_option_offset=0;
@@ -6775,6 +6906,29 @@ bool CSimplePacketParser::Parse(){
}
+/* free the right object.
+ it is classic to use virtual function but we can't do it here and we don't even want to use callback function
+ as we want to save space and in most cases there is nothing to free.
+ this might be changed in the future
+ */
+void CGenNodeBase::free_base(){
+ if ( m_type == FLOW_PKT ) {
+ CGenNode* p=(CGenNode*)this;
+ p->free_gen_node();
+ return;
+ }
+ if (m_type==STATELESS_PKT) {
+ CGenNodeStateless* p=(CGenNodeStateless*)this;
+ p->free_stl_node();
+ return;
+ }
+ if ( m_type == COMMAND ) {
+ CGenNodeCommand* p=(CGenNodeCommand*)this;
+ p->free_command();
+ }
+
+}
+
diff --git a/src/bp_sim.h b/src/bp_sim.h
index af084757..0da7fb99 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -62,6 +62,12 @@ limitations under the License.
#undef NAT_TRACE_
+static inline double
+usec_to_sec(double usec) {
+ return (usec / (1000 * 1000));
+}
+
+
#define FORCE_NO_INLINE __attribute__ ((noinline))
#define MAX_LATENCY_PORTS 12
@@ -328,6 +334,9 @@ public:
CVirtualIF (){
m_preview_mode =NULL;
}
+
+ virtual ~CVirtualIF(){
+ }
public:
virtual int open_file(std::string file_name)=0;
@@ -372,6 +381,15 @@ public:
* @return
*/
virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m)=0;
+
+ /**
+ * translate a port_id to the correct dir on the core
+ *
+ */
+ virtual pkt_dir_t port_id_to_dir(uint8_t port_id) {
+ return (CS_INVALID);
+ }
+
public:
@@ -885,6 +903,8 @@ public:
/* number of main active sockets. socket #0 is always used */
virtual socket_id_t max_num_active_sockets()=0;
+ virtual ~CPlatformSocketInfoBase() {}
+
public:
/* which socket to allocate memory to each port */
virtual socket_id_t port_to_socket(port_id_t port)=0;
@@ -1129,6 +1149,9 @@ public:
class CGlobalInfo {
public:
static void init_pools(uint32_t rx_buffers);
+ /* for simulation */
+ static void free_pools();
+
static inline rte_mbuf_t * pktmbuf_alloc_small(socket_id_t socket){
return ( m_mem_pool[socket].pktmbuf_alloc_small() );
@@ -1328,8 +1351,8 @@ public:
-#define DP(f) if (f) printf(" %-40s: %llu \n",#f,f)
-#define DP_name(n,f) if (f) printf(" %-40s: %llu \n",n,f)
+#define DP(f) if (f) printf(" %-40s: %llu \n",#f,(unsigned long long)f)
+#define DP_name(n,f) if (f) printf(" %-40s: %llu \n",n,(unsigned long long)f)
#define DP_S(f,f_s) if (f) printf(" %-40s: %s \n",#f,f_s.c_str())
@@ -1365,7 +1388,11 @@ public:
FLOW_PKT_NAT =3,
FLOW_SYNC =4, /* called evey 1 msec */
STATELESS_PKT =5,
- EXIT_SCHED =6
+ EXIT_SCHED =6,
+ COMMAND =7,
+
+ EXIT_PORT_SCHED =8
+
};
@@ -1421,6 +1448,7 @@ public:
}
+ void free_base();
};
@@ -1455,6 +1483,9 @@ public:
uint32_t m_dest_idx;
uint32_t m_end_of_cache_line[6];
+
+public:
+ void free_gen_node();
public:
void Dump(FILE *fd);
@@ -1641,6 +1672,8 @@ public:
+
+
#if __x86_64__
/* size of 64 bytes */
#define DEFER_CLIENTS_NUM (16)
@@ -1791,11 +1824,24 @@ public:
virtual int flush_tx_queue(void);
-private:
+protected:
+
+ void fill_raw_packet(rte_mbuf_t * m,CGenNode * node,pkt_dir_t dir);
+
CFileWriterBase * m_writer;
CCapPktRaw * m_raw;
};
+/* for stateless we have a small changes in case we send the packets for optimization */
+class CErfIFStl : public CErfIF {
+
+public:
+
+ virtual int send_node(CGenNode * node);
+};
+
+
+
static inline int fill_pkt(CCapPktRaw * raw,rte_mbuf_t * m){
raw->pkt_len = m->pkt_len;
char *p=raw->raw;
@@ -1860,6 +1906,8 @@ public:
public:
void add_node(CGenNode * mynode);
void remove_all(CFlowGenListPerThread * thread);
+ void remove_all_stateless(CFlowGenListPerThread * thread);
+
int open_file(std::string file_name,
CPreviewMode * preview);
int close_file(CFlowGenListPerThread * thread);
@@ -1892,9 +1940,12 @@ private:
return (m_v_if->send_node(node));
}
int update_stats(CGenNode * node);
- FORCE_NO_INLINE void handle_slow_messages(uint8_t type,
- CGenNode * node,
- CFlowGenListPerThread * thread,
+ int update_stl_stats(CGenNodeStateless *node_sl);
+
+
+ FORCE_NO_INLINE bool handle_slow_messages(uint8_t type,
+ CGenNode * node,
+ CFlowGenListPerThread * thread,
bool always);
@@ -2370,6 +2421,7 @@ public:
return (uint32_t)((uintptr_t)( ((char *)l3.m_ipv4)-getBasePtr()) );
}else{
BP_ASSERT(0);
+ return (0);
}
}
@@ -3358,6 +3410,13 @@ public:
uint32_t max_threads);
void Delete();
+ void set_terminate_mode(bool is_terminate){
+ m_terminated_by_master =is_terminate;
+ }
+ bool is_terminated_by_master(){
+ return (m_terminated_by_master);
+ }
+
void set_vif(CVirtualIF * v_if){
m_node_gen.set_vif(v_if);
}
@@ -3397,7 +3456,18 @@ public :
public:
void Clean();
void start_generate_stateful(std::string erf_file_name,CPreviewMode &preview);
- void start_stateless_daemon();
+ void start_stateless_daemon(CPreviewMode &preview);
+
+ void start_stateless_daemon_simulation();
+
+ /* open a file for simulation */
+ void start_stateless_simulation_file(std::string erf_file_name,CPreviewMode &preview);
+ /* close a file for simulation */
+ void stop_stateless_simulation_file();
+
+ /* return true if we need to shedule next_stream, */
+ bool set_stateless_next_node( CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node);
void Dump(FILE *fd);
@@ -3471,6 +3541,7 @@ public:
CNodeGenerator m_node_gen;
public:
uint32_t m_cur_template;
+ uint32_t m_non_active_nodes; /* the number of non active nodes -> nodes that try to stop somthing */
uint64_t m_cur_flow_id;
double m_cur_time_sec;
double m_stop_time_sec;
@@ -3491,7 +3562,8 @@ private:
flow_id_node_t m_flow_id_to_node_lookup;
- TrexStatelessDpCore *m_stateless_dp_info;
+ TrexStatelessDpCore m_stateless_dp_info;
+ bool m_terminated_by_master;
private:
uint8_t m_cacheline_pad[RTE_CACHE_LINE_SIZE][19]; // improve prefech
@@ -3506,7 +3578,10 @@ inline CGenNode * CFlowGenListPerThread::create_node(void){
return (res);
}
+
+
inline void CFlowGenListPerThread::free_node(CGenNode *p){
+ p->free_base();
rte_mempool_sp_put(m_node_pool, p);
}
@@ -4020,6 +4095,8 @@ enum MINVM_PLUGIN_ID{
class CPluginCallback {
public:
+ virtual ~CPluginCallback(){
+ }
virtual void on_node_first(uint8_t plugin_id,CGenNode * node,CFlowYamlInfo * template_info, CTupleTemplateGeneratorSmart * tuple_gen,CFlowGenListPerThread * flow_gen) =0;
virtual void on_node_last(uint8_t plugin_id,CGenNode * node)=0;
virtual rte_mbuf_t * on_node_generate_mbuf(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info)=0;
diff --git a/src/common/Network/Packet/IPHeader.cpp b/src/common/Network/Packet/IPHeader.cpp
index 3b90a1aa..c3363603 100755
--- a/src/common/Network/Packet/IPHeader.cpp
+++ b/src/common/Network/Packet/IPHeader.cpp
@@ -52,7 +52,7 @@ void IPHeader::dump(FILE *fd)
{
fprintf(fd, "\nIPHeader");
fprintf(fd, "\nSource 0x%.8lX, Destination 0x%.8lX, Protocol 0x%.1X",
- getSourceIp(), getDestIp(), getProtocol());
+ (ulong)getSourceIp(), (ulong)getDestIp(), (uint)getProtocol());
fprintf(fd, "\nTTL : %d, Id : 0x%.2X, Ver %d, Header Length %d, Total Length %d",
getTimeToLive(), getId(), getVersion(), getHeaderLength(), getTotalLength());
if(isFragmented())
diff --git a/src/common/Network/Packet/TCPHeader.cpp b/src/common/Network/Packet/TCPHeader.cpp
index bf28db2e..1826cef8 100755
--- a/src/common/Network/Packet/TCPHeader.cpp
+++ b/src/common/Network/Packet/TCPHeader.cpp
@@ -25,7 +25,7 @@ void TCPHeader::dump(FILE *fd)
fprintf(fd, "\nSourcePort 0x%.4X, DestPort 0x%.4X",
getSourcePort(), getDestPort());
fprintf(fd, "\nSeqNum 0x%.8lX, AckNum 0x%.8lX, Window %d",
- getSeqNumber(), getAckNumber(), getWindowSize());
+ (ulong)getSeqNumber(), (ulong)getAckNumber(), getWindowSize());
fprintf(fd, "\nHeader Length : %d, Checksum : 0x%.4X",
getHeaderLength(), getChecksum());
fprintf(fd, "\nFlags : SYN - %d, FIN - %d, ACK - %d, URG - %d, RST - %d, PSH - %d",
diff --git a/src/common/c_common.h b/src/common/c_common.h
index d8320aaa..3e43644f 100755
--- a/src/common/c_common.h
+++ b/src/common/c_common.h
@@ -46,7 +46,7 @@ typedef void* c_pvoid;
#ifdef _DEBUG
#define BP_ASSERT(a) assert(a)
#else
- #define BP_ASSERT(a)
+ #define BP_ASSERT(a) (void (a))
#endif
#endif
diff --git a/src/gtest/rpc_test.cpp b/src/gtest/rpc_test.cpp
index 6b8e3eff..34bb02a8 100644
--- a/src/gtest/rpc_test.cpp
+++ b/src/gtest/rpc_test.cpp
@@ -478,6 +478,7 @@ TEST_F(RpcTestOwned, add_remove_stream) {
create_request(request, "get_stream", 1, 1);
request["params"]["stream_id"] = 5;
+ request["params"]["get_pkt"] = true;
send_request(request, response);
@@ -501,6 +502,7 @@ TEST_F(RpcTestOwned, add_remove_stream) {
create_request(request, "get_stream", 1, 1);
request["params"]["stream_id"] = 5;
+ request["params"]["get_pkt"] = true;
send_request(request, response);
@@ -607,17 +609,21 @@ TEST_F(RpcTestOwned, start_stop_traffic) {
/* start port 1 */
create_request(request, "start_traffic", 1, 1);
+ request["params"]["mul"] = 1.0;
send_request(request, response);
+
EXPECT_EQ(response["result"], "ACK");
/* start port 3 */
create_request(request, "start_traffic", 1, 3);
+ request["params"]["mul"] = 1.0;
send_request(request, response);
EXPECT_EQ(response["result"], "ACK");
/* start not configured port */
create_request(request, "start_traffic", 1, 2);
+ request["params"]["mul"] = 1.0;
send_request(request, response);
EXPECT_EQ(response["error"]["code"], -32000);
@@ -633,11 +639,13 @@ TEST_F(RpcTestOwned, start_stop_traffic) {
/* start 1 again */
create_request(request, "start_traffic", 1, 1);
+ request["params"]["mul"] = 1.0;
send_request(request, response);
EXPECT_EQ(response["result"], "ACK");
/* start 1 twice (error) */
create_request(request, "start_traffic", 1, 1);
+ request["params"]["mul"] = 1.0;
send_request(request, response);
EXPECT_EQ(response["error"]["code"], -32000);
@@ -657,3 +665,96 @@ TEST_F(RpcTestOwned, start_stop_traffic) {
EXPECT_EQ(response["result"], "ACK");
}
+
+
+TEST_F(RpcTestOwned, states_check) {
+ Json::Value request;
+ Json::Value response;
+
+ /* add stream #1 */
+ create_request(request, "add_stream", 1, 1);
+ request["params"]["stream_id"] = 5;
+
+ Json::Value stream;
+ create_simple_stream(stream);
+
+ request["params"]["stream"] = stream;
+
+ send_request(request, response);
+ EXPECT_EQ(response["result"], "ACK");
+
+ /* start traffic */
+ create_request(request, "start_traffic", 1, 1);
+ request["params"]["mul"] = 1.0;
+ send_request(request, response);
+ EXPECT_EQ(response["result"], "ACK");
+
+ /* now we cannot add streams */
+ create_request(request, "add_stream", 1, 1);
+ request["params"]["stream_id"] = 15;
+
+ create_simple_stream(stream);
+
+ request["params"]["stream"] = stream;
+
+ send_request(request, response);
+ EXPECT_EQ(response["error"]["code"], -32000);
+
+ /* we cannot remove streams */
+ create_request(request, "remove_stream", 1, 1);
+ request["params"]["stream_id"] = 15;
+ send_request(request, response);
+ EXPECT_EQ(response["error"]["code"], -32000);
+
+ /* cannot start again */
+ create_request(request, "start_traffic", 1, 1);
+ request["params"]["mul"] = 1.0;
+ send_request(request, response);
+ EXPECT_EQ(response["error"]["code"], -32000);
+
+ /* we can stop and add stream / remove */
+
+ create_request(request, "stop_traffic", 1, 1);
+ send_request(request, response);
+ EXPECT_EQ(response["result"], "ACK");
+
+ create_request(request, "add_stream", 1, 1);
+ request["params"]["stream_id"] = 328;
+
+ create_simple_stream(stream);
+
+ request["params"]["stream"] = stream;
+
+ send_request(request, response);
+ EXPECT_EQ(response["result"], "ACK");
+
+
+ create_request(request, "remove_stream", 1, 1);
+ request["params"]["stream_id"] = 15;
+ send_request(request, response);
+ EXPECT_EQ(response["error"]["code"], -32000);
+
+ /* we cannot pause now */
+ create_request(request, "pause_traffic", 1, 1);
+ send_request(request, response);
+ EXPECT_EQ(response["error"]["code"], -32000);
+
+
+ /* start */
+ create_request(request, "start_traffic", 1, 1);
+ request["params"]["mul"] = 1.0;
+ send_request(request, response);
+ EXPECT_EQ(response["result"], "ACK");
+
+ /* now can pause */
+ create_request(request, "pause_traffic", 1, 1);
+ send_request(request, response);
+ EXPECT_EQ(response["result"], "ACK");
+
+ /* also we can resume*/
+ create_request(request, "resume_traffic", 1, 1);
+ send_request(request, response);
+ EXPECT_EQ(response["result"], "ACK");
+
+
+}
diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp
index 0341516c..8640e7db 100644
--- a/src/gtest/trex_stateless_gtest.cpp
+++ b/src/gtest/trex_stateless_gtest.cpp
@@ -1,5 +1,5 @@
/*
- Hanoh Haim
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -22,332 +22,1651 @@ limitations under the License.
#include "bp_sim.h"
#include <common/gtest.h>
#include <common/basic_utils.h>
-
+#include <trex_stateless_dp_core.h>
+#include <trex_stateless_messaging.h>
+#include <trex_streams_compiler.h>
+#include <trex_stream_node.h>
+#include <trex_stream.h>
+#include <trex_stateless_port.h>
+#include <trex_rpc_server_api.h>
+#include <iostream>
#define EXPECT_EQ_UINT32(a,b) EXPECT_EQ((uint32_t)(a),(uint32_t)(b))
-// one stream info with const packet , no VM
-class CTRexDpStatelessVM {
+
+/* basic stateless test */
+class basic_stl : public testing::Test {
+ protected:
+ virtual void SetUp() {
+ }
+ virtual void TearDown() {
+ }
+public:
};
-//- add dump function
-// - check one object
-// create frame work
-class CTRexDpStreamModeContinues{
-public:
- void set_pps(double pps){
- m_pps=pps;
- }
- double get_pps(){
- return (m_pps);
- }
+/**
+ * Queue of RPC msgs for test
+ *
+ * @author hhaim
+ */
- void dump(FILE *fd);
-private:
- double m_pps;
+class CBasicStl;
+
+
+struct CBasicStlDelayCommand {
+
+ CBasicStlDelayCommand(){
+ m_node=NULL;
+ }
+ CGenNodeCommand * m_node;
};
-void CTRexDpStreamModeContinues::dump(FILE *fd){
- fprintf (fd," pps : %f \n",m_pps);
-}
+class CBasicStlMsgQueue {
+
+friend CBasicStl;
-class CTRexDpStreamModeSingleBurst{
public:
- void set_pps(double pps){
- m_pps=pps;
- }
- double get_pps(){
- return (m_pps);
+ CBasicStlMsgQueue(){
}
- void set_total_packets(uint64_t total_packets){
- m_total_packets =total_packets;
+ /* user will allocate the message, no need to free it by this module */
+ void add_msg(TrexStatelessCpToDpMsgBase * msg){
+ m_msgs.push_back(msg);
}
- uint64_t get_total_packets(){
- return (m_total_packets);
+ void add_command(CBasicStlDelayCommand & command){
+ m_commands.push_back(command);
}
- void dump(FILE *fd);
+ /* only if both port are idle we can exit */
+ void add_command(CFlowGenListPerThread * core,
+ TrexStatelessCpToDpMsgBase * msg,
+ double time){
-private:
- double m_pps;
- uint64_t m_total_packets;
-};
+ CGenNodeCommand *node = (CGenNodeCommand *)core->create_node() ;
+ node->m_type = CGenNode::COMMAND;
-void CTRexDpStreamModeSingleBurst::dump(FILE *fd){
- fprintf (fd," pps : %f \n",m_pps);
- fprintf (fd," total_packets : %llu \n",m_total_packets);
-}
+ node->m_cmd = msg;
+ /* make sure it will be scheduled after the current node */
+ node->m_time = time ;
-class CTRexDpStreamModeMultiBurst{
-public:
- void set_pps(double pps){
- m_pps=pps;
- }
- double get_pps(){
- return (m_pps);
- }
+ CBasicStlDelayCommand command;
+ command.m_node =node;
- void set_pkts_per_burst(uint64_t pkts_per_burst){
- m_pkts_per_burst =pkts_per_burst;
+ add_command(command);
}
- uint64_t get_pkts_per_burst(){
- return (m_pkts_per_burst);
- }
- void set_ibg(double ibg){
- m_ibg = ibg;
+ void clear(){
+ m_msgs.clear();
+ m_commands.clear();
}
- double get_ibg(){
- return ( m_ibg );
+
+protected:
+ std::vector<TrexStatelessCpToDpMsgBase *> m_msgs;
+
+ std::vector<CBasicStlDelayCommand> m_commands;
+};
+
+
+
+class CBasicStlSink {
+
+public:
+ CBasicStlSink(){
+ m_core=0;
}
+ virtual void call_after_init(CBasicStl * m_obj)=0;
+ virtual void call_after_run(CBasicStl * m_obj)=0;
- void set_number_of_bursts(uint32_t number_of_bursts){
- m_number_of_bursts = number_of_bursts;
+ CFlowGenListPerThread * m_core;
+};
+
+
+/**
+ * handler for DP to CP messages
+ *
+ * @author imarom (19-Nov-15)
+ */
+class DpToCpHandler {
+public:
+ virtual void handle(TrexStatelessDpToCpMsgBase *msg) = 0;
+};
+
+class CBasicStl {
+
+public:
+
+
+ CBasicStl(){
+ m_time_diff=0.001;
+ m_threads=1;
+ m_dump_json=false;
+ m_dp_to_cp_handler = NULL;
+ m_msg = NULL;
+ m_sink = NULL;
}
- uint32_t get_number_of_bursts(){
- return (m_number_of_bursts);
+
+ void flush_dp_to_cp_messages() {
+
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(0);
+
+ while ( true ) {
+ CGenNode * node = NULL;
+ if (ring->Dequeue(node) != 0) {
+ break;
+ }
+ assert(node);
+
+ TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node;
+ if (m_dp_to_cp_handler) {
+ m_dp_to_cp_handler->handle(msg);
+ }
+
+ delete msg;
+ }
+
}
- void dump(FILE *fd);
-private:
- double m_pps;
- double m_ibg; // inter burst gap
- uint64_t m_pkts_per_burst;
- uint32_t m_number_of_bursts;
+ bool init(void){
+
+ CErfIFStl erf_vif;
+ fl.Create();
+ fl.generate_p_thread_info(1);
+ CFlowGenListPerThread * lpt;
+
+ fl.m_threads_info[0]->set_vif(&erf_vif);
+
+ CErfCmp cmp;
+ cmp.dump=1;
+
+ CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
+
+ m_ring_from_cp = cp_dp->getRingCpToDp(0);
+
+
+ bool res=true;
+
+ lpt=fl.m_threads_info[0];
+
+ if ( m_sink ){
+ m_sink->m_core =lpt;
+ }
+
+ char buf[100];
+ char buf_ex[100];
+ sprintf(buf,"%s-%d.erf",CGlobalInfo::m_options.out_file.c_str(),0);
+ sprintf(buf_ex,"%s-%d-ex.erf",CGlobalInfo::m_options.out_file.c_str(),0);
+
+ lpt->start_stateless_simulation_file(buf,CGlobalInfo::m_options.preview);
+
+ /* add stream to the queue */
+ if ( m_msg ) {
+ assert(m_ring_from_cp->Enqueue((CGenNode *)m_msg)==0);
+ }
+ if (m_msg_queue.m_msgs.size()>0) {
+ for (auto msg : m_msg_queue.m_msgs) {
+ assert(m_ring_from_cp->Enqueue((CGenNode *)msg)==0);
+ }
+ }
+
+ if (m_sink) {
+ m_sink->call_after_init(this);
+ }
+
+ /* add the commands */
+ if (m_msg_queue.m_commands.size()>0) {
+ for (auto cmd : m_msg_queue.m_commands) {
+ /* add commands nodes */
+ lpt->m_node_gen.add_node((CGenNode *)cmd.m_node);
+ }
+ }
+
+ lpt->start_stateless_daemon_simulation();
+
+
+ //lpt->m_node_gen.DumpHist(stdout);
+
+ cmp.d_sec = m_time_diff;
+ if ( cmp.compare(std::string(buf),std::string(buf_ex)) != true ) {
+ res=false;
+ }
+
+ if ( m_dump_json ){
+ printf(" dump json ...........\n");
+ std::string s;
+ fl.m_threads_info[0]->m_node_gen.dump_json(s);
+ printf(" %s \n",s.c_str());
+ }
+
+ if (m_sink) {
+ m_sink->call_after_run(this);
+ }
+
+ flush_dp_to_cp_messages();
+ m_msg_queue.clear();
+
+
+ fl.Delete();
+ return (res);
+ }
+
+public:
+ int m_threads;
+ double m_time_diff;
+ bool m_dump_json;
+ DpToCpHandler *m_dp_to_cp_handler;
+ CBasicStlSink * m_sink;
+
+ TrexStatelessCpToDpMsgBase * m_msg;
+ CNodeRing *m_ring_from_cp;
+ CBasicStlMsgQueue m_msg_queue;
+ CFlowGenList fl;
};
-void CTRexDpStreamModeMultiBurst::dump(FILE *fd){
- fprintf (fd," pps : %f \n",m_pps);
- fprintf (fd," total_packets : %llu \n",m_pkts_per_burst);
- fprintf (fd," ibg : %f \n",m_ibg);
- fprintf (fd," num_of_bursts : %llu \n",m_number_of_bursts);
-}
+class CPcapLoader {
+public:
+ CPcapLoader();
+ ~CPcapLoader();
-class CTRexDpStreamMode {
public:
- enum MODES {
- moCONTINUES = 0x0,
- moSINGLE_BURST = 0x1,
- moMULTI_BURST = 0x2
- } ;
- typedef uint8_t MODE_TYPE_t;
+ bool load_pcap_file(std::string file,int pkt_id=0);
+ void update_ip_src(uint32_t ip_addr);
+ void clone_packet_into_stream(TrexStream * stream);
+ void dump_packet();
- void reset();
+public:
+ bool m_valid;
+ CCapPktRaw m_raw;
+ CPacketIndication m_pkt_indication;
+};
- void set_mode(MODE_TYPE_t mode ){
- m_type = mode;
- }
+CPcapLoader::~CPcapLoader(){
+}
- MODE_TYPE_t get_mode(){
- return (m_type);
+bool CPcapLoader::load_pcap_file(std::string cap_file,int pkt_id){
+ m_valid=false;
+ CPacketParser parser;
+
+ CCapReaderBase * lp=CCapReaderFactory::CreateReader((char *)cap_file.c_str(),0);
+
+ if (lp == 0) {
+ printf(" ERROR file %s does not exist or not supported \n",(char *)cap_file.c_str());
+ return false;
}
+ int cnt=0;
+ bool found =false;
+
- CTRexDpStreamModeContinues & cont(void){
- return (m_data.m_cont);
+ while ( true ) {
+ /* read packet */
+ if ( lp->ReadPacket(&m_raw) ==false ){
+ break;
+ }
+ if (cnt==pkt_id) {
+ found = true;
+ break;
+ }
+ cnt++;
}
- CTRexDpStreamModeSingleBurst & single_burst(void){
- return (m_data.m_signle_burst);
+ if ( found ){
+ if ( parser.ProcessPacket(&m_pkt_indication, &m_raw) ){
+ m_valid = true;
+ }
}
- CTRexDpStreamModeMultiBurst & multi_burst(void){
- return (m_data.m_multi_burst);
+ delete lp;
+ return (m_valid);
+}
+
+void CPcapLoader::update_ip_src(uint32_t ip_addr){
+
+ if ( m_pkt_indication.l3.m_ipv4 ) {
+ m_pkt_indication.l3.m_ipv4->setSourceIp(ip_addr);
+ m_pkt_indication.l3.m_ipv4->updateCheckSum();
}
+}
+
+void CPcapLoader::clone_packet_into_stream(TrexStream * stream){
+
+ uint16_t pkt_size=m_raw.getTotalLen();
+
+ uint8_t *binary = new uint8_t[pkt_size];
+ memcpy(binary,m_raw.raw,pkt_size);
+ stream->m_pkt.binary = binary;
+ stream->m_pkt.len = pkt_size;
+}
- void dump(FILE *fd);
-private:
- uint8_t m_type;
- union Data {
- CTRexDpStreamModeContinues m_cont;
- CTRexDpStreamModeSingleBurst m_signle_burst;
- CTRexDpStreamModeMultiBurst m_multi_burst;
- } m_data;
-};
-void CTRexDpStreamMode::reset(){
- m_type =CTRexDpStreamMode::moCONTINUES;
- memset(&m_data,0,sizeof(m_data));
+CPcapLoader::CPcapLoader(){
+
}
-
-void CTRexDpStreamMode::dump(FILE *fd){
- const char * table[3] = {"CONTINUES","SINGLE_BURST","MULTI_BURST"};
-
- fprintf(fd," mode : %s \n", (char*)table[m_type]);
- switch (m_type) {
- case CTRexDpStreamMode::moCONTINUES :
- cont().dump(fd);
- break;
- case CTRexDpStreamMode::moSINGLE_BURST :
- single_burst().dump(fd);
- break;
- case CTRexDpStreamMode::moMULTI_BURST :
- multi_burst().dump(fd);
- break;
- default:
- fprintf(fd," ERROR type if not valid %d \n",m_type);
- break;
+
+void CPcapLoader::dump_packet(){
+ if (m_valid ) {
+ m_pkt_indication.Dump(stdout,1);
+ }else{
+ fprintf(stdout," no packets were found \n");
}
}
+TEST_F(basic_stl, load_pcap_file) {
+ printf (" stateles %d \n",(int)sizeof(CGenNodeStateless));
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+
+ //pcap.dump_packet();
+}
-class CTRexDpStatelessStream {
+class CBBStartPause0: public CBasicStlSink {
public:
- enum FLAGS_0{
- _ENABLE = 0,
- _SELF_START = 1,
- _VM_ENABLE =2,
- _END_STREAM =-1
+
+ virtual void call_after_init(CBasicStl * m_obj);
+ virtual void call_after_run(CBasicStl * m_obj){
};
+ uint8_t m_port_id;
+};
- CTRexDpStatelessStream(){
- reset();
- }
- void reset(){
- m_packet =0;
- m_vm=0;
- m_flags=0;
- m_isg_sec=0.0;
- m_next_stream = CTRexDpStatelessStream::_END_STREAM ; // END
- m_mode.reset();
- }
- void set_enable(bool enable){
- btSetMaskBit32(m_flags,_ENABLE,_ENABLE,enable?1:0);
- }
+void CBBStartPause0::call_after_init(CBasicStl * m_obj){
- bool get_enabled(){
- return (btGetMaskBit32(m_flags,_ENABLE,_ENABLE)?true:false);
- }
+ TrexStatelessDpPause * lpPauseCmd = new TrexStatelessDpPause(m_port_id);
+ TrexStatelessDpResume * lpResumeCmd1 = new TrexStatelessDpResume(m_port_id);
- void set_self_start(bool enable){
- btSetMaskBit32(m_flags,_SELF_START,_SELF_START,enable?1:0);
- }
+ m_obj->m_msg_queue.add_command(m_core,lpPauseCmd, 5.0); /* command in delay of 5 sec */
+ m_obj->m_msg_queue.add_command(m_core,lpResumeCmd1, 7.0);/* command in delay of 7 sec */
- bool get_self_start(bool enable){
- return (btGetMaskBit32(m_flags,_SELF_START,_SELF_START)?true:false);
- }
+}
- /* if we don't have VM we could just replicate the mbuf and allocate it once */
- void set_vm_enable(bool enable){
- btSetMaskBit32(m_flags,_VM_ENABLE,_VM_ENABLE,enable?1:0);
- }
- bool get_vm_enabled(bool enable){
- return (btGetMaskBit32(m_flags,_VM_ENABLE,_VM_ENABLE)?true:false);
- }
+/* start/stop/stop back to back */
+TEST_F(basic_stl, basic_pause_resume0) {
- void set_inter_stream_gap(double isg_sec){
- m_isg_sec =isg_sec;
- }
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_basic_pause_resume0";
- double get_inter_stream_gap(){
- return (m_isg_sec);
- }
+ TrexStreamsCompiler compile;
- CTRexDpStreamMode & get_mode();
+ uint8_t port_id=0;
+ std::vector<TrexStream *> streams;
- // CTRexDpStatelessStream::_END_STREAM for END
- void set_next_stream(int32_t next_stream){
- m_next_stream =next_stream;
- }
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
- int32_t get_next_stream(void){
- return ( m_next_stream );
- }
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
- void dump(FILE *fd);
-private:
- char * m_packet;
- CTRexDpStatelessVM * m_vm;
- uint32_t m_flags;
- double m_isg_sec; // in second
- CTRexDpStreamMode m_mode;
- int32_t m_next_stream; // next stream id
-};
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
-//- list of streams info with const packet , no VM
-// - object that include the stream /scheduler/ packet allocation / need to create an object for one thread that works for test
-// generate pcap file and compare it
+ // stream - clean
-#if 0
-void CTRexDpStatelessStream::dump(FILE *fd){
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
- fprintf(fd," enabled : %d \n",get_enabled()?1:0);
- fprintf(fd," self_start : %d \n",get_self_start()?1:0);
- fprintf(fd," vm : %d \n",get_vm_enabled()?1:0);
- fprintf(" isg : %f \n",m_isg_sec);
- m_mode.dump(fd);
- if (m_next_stream == CTRexDpStatelessStream::_END_STREAM ) {
- fprintf(fd," action : End of Stream \n");
- }else{
- fprintf(" next : %d \n",m_next_stream);
- }
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+
+ t1.m_msg_queue.add_msg(lpStartCmd);
+
+
+ CBBStartPause0 sink;
+ sink.m_port_id = port_id;
+ t1.m_sink = &sink;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
}
+//////////////////////////////////////////////////////////////
-class CTRexStatelessBasic {
+class CBBStartStopDelay2: public CBasicStlSink {
public:
- CTRexStatelessBasic(){
- m_threads=1;
- }
- bool init(void){
- return (true);
- }
+ virtual void call_after_init(CBasicStl * m_obj);
+ virtual void call_after_run(CBasicStl * m_obj){
+ };
+ uint8_t m_port_id;
+};
+
+
+
+void CBBStartStopDelay2::call_after_init(CBasicStl * m_obj){
+
+ TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(m_port_id);
+ TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(m_port_id);
+
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000002);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+
+ /* start with different event id */
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(m_port_id, 1, comp_obj.clone(), 10.0 /*sec */ );
+
+
+ m_obj->m_msg_queue.add_command(m_core,lpStopCmd, 5.0); /* command in delay of 5 sec */
+ m_obj->m_msg_queue.add_command(m_core,lpStopCmd1, 7.0);/* command in delay of 7 sec */
+ m_obj->m_msg_queue.add_command(m_core,lpStartCmd, 7.5);/* command in delay of 7 sec */
+
+ delete stream1 ;
+
+
+}
+
+
+
+/* start/stop/stop back to back */
+TEST_F(basic_stl, single_pkt_bb_start_stop_delay2) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_bb_start_stop_delay2";
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+
+ t1.m_msg_queue.add_msg(lpStartCmd);
+
+
+ CBBStartStopDelay2 sink;
+ sink.m_port_id = port_id;
+ t1.m_sink = &sink;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+
+
+
+
+
+class CBBStartStopDelay1: public CBasicStlSink {
public:
- bool m_threads;
+
+ virtual void call_after_init(CBasicStl * m_obj);
+ virtual void call_after_run(CBasicStl * m_obj){
+ };
+ uint8_t m_port_id;
};
-/* stateless basic */
-class dp_sl_basic : public testing::Test {
- protected:
- virtual void SetUp() {
- }
- virtual void TearDown() {
- }
+
+void CBBStartStopDelay1::call_after_init(CBasicStl * m_obj){
+
+ TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(m_port_id);
+ TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(m_port_id);
+
+ m_obj->m_msg_queue.add_command(m_core,lpStopCmd, 5.0); /* command in delay of 5 sec */
+ m_obj->m_msg_queue.add_command(m_core,lpStopCmd1, 7.0);/* command in delay of 7 sec */
+}
+
+
+
+/* start/stop/stop back to back */
+TEST_F(basic_stl, single_pkt_bb_start_stop_delay1) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_bb_start_stop_delay1";
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+
+ t1.m_msg_queue.add_msg(lpStartCmd);
+
+
+ CBBStartStopDelay1 sink;
+ sink.m_port_id = port_id;
+ t1.m_sink = &sink;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+/* start/stop/stop back to back */
+TEST_F(basic_stl, single_pkt_bb_start_stop3) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_bb_start_stop3";
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+ TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(port_id);
+ TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(port_id);
+
+
+ t1.m_msg_queue.add_msg(lpStartCmd);
+ t1.m_msg_queue.add_msg(lpStopCmd);
+ t1.m_msg_queue.add_msg(lpStopCmd1);
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+TEST_F(basic_stl, single_pkt_bb_start_stop2) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_bb_start_stop2";
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+ TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(port_id);
+ TrexStatelessDpStart * lpStartCmd1 = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+
+
+ t1.m_msg_queue.add_msg(lpStartCmd);
+ t1.m_msg_queue.add_msg(lpStopCmd);
+ t1.m_msg_queue.add_msg(lpStartCmd1);
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+
+/* back to back send start/stop */
+TEST_F(basic_stl, single_pkt_bb_start_stop) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_bb_start_stop";
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+ TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(port_id);
+
+
+ t1.m_msg_queue.add_msg(lpStartCmd);
+ t1.m_msg_queue.add_msg(lpStopCmd);
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+
+
+TEST_F(basic_stl, simple_prog4) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_simple_prog4";
+
+ TrexStreamsCompiler compile;
+
+
+ std::vector<TrexStream *> streams;
+
+
+ /* stream0 */
+ TrexStream * stream0 = new TrexStream(TrexStream::stCONTINUOUS, 0,300);
+ stream0->set_pps(1.0);
+ stream0->m_enabled = true;
+ stream0->m_self_start = true;
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000000);
+ pcap.clone_packet_into_stream(stream0);
+ streams.push_back(stream0);
+
+
+ /* stream1 */
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,100);
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(5);
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_next_stream_id=200;
+
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+
+ /* stream1 */
+
+ TrexStream * stream2 = new TrexStream(TrexStream::stMULTI_BURST, 0,200);
+ stream2->set_pps(1.0);
+ stream2->m_isg_usec = 1000000; /*time betwean stream 1 to stream 2 */
+ stream2->m_enabled = true;
+ stream2->m_self_start = false;
+ stream2->set_multi_burst(5,
+ 3,
+ 2000000.0);
+
+ // next stream is 100 - loop
+ stream2->m_next_stream_id=100;
+
+
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000002);
+ pcap.clone_packet_into_stream(stream2);
+ streams.push_back(stream2);
+
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ EXPECT_TRUE(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 20.0 );
+
+
+ t1.m_msg = lpstart;
+
+ bool res=t1.init();
+
+ delete stream0 ;
+ delete stream1 ;
+ delete stream2 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+
+TEST_F(basic_stl, simple_prog3) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_simple_prog3";
+
+ TrexStreamsCompiler compile;
+
+
+ std::vector<TrexStream *> streams;
+
+ /* stream1 */
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,100);
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(5);
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_next_stream_id=200;
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+
+ /* stream1 */
+
+ TrexStream * stream2 = new TrexStream(TrexStream::stMULTI_BURST, 0,200);
+ stream2->set_pps(1.0);
+ stream2->m_isg_usec = 1000000; /*time betwean stream 1 to stream 2 */
+ stream2->m_enabled = true;
+ stream2->m_self_start = false;
+ stream2->set_multi_burst(5,
+ 3,
+ 2000000.0);
+
+ // next stream is 100 - loop
+ stream2->m_next_stream_id=100;
+
+
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000002);
+ pcap.clone_packet_into_stream(stream2);
+ streams.push_back(stream2);
+
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ EXPECT_TRUE(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 50.0 );
+
+
+ t1.m_msg = lpstart;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+ delete stream2 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+TEST_F(basic_stl, simple_prog2) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_simple_prog2";
+
+ TrexStreamsCompiler compile;
+
+
+ std::vector<TrexStream *> streams;
+
+ /* stream1 */
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,100);
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(5);
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_next_stream_id=200;
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+
+ /* stream1 */
+
+ TrexStream * stream2 = new TrexStream(TrexStream::stSINGLE_BURST, 0,200);
+ stream2->set_pps(1.0);
+ stream2->set_single_burst(5);
+ stream2->m_isg_usec = 2000000; /*time betwean stream 1 to stream 2 */
+ stream2->m_enabled = true;
+ stream2->m_self_start = false;
+
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000002);
+ pcap.clone_packet_into_stream(stream2);
+ streams.push_back(stream2);
+
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ EXPECT_TRUE(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10.0 );
+
+
+ t1.m_msg = lpstart;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+ delete stream2 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+
+TEST_F(basic_stl, simple_prog1) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_simple_prog1";
+
+ TrexStreamsCompiler compile;
+
+
+ std::vector<TrexStream *> streams;
+
+ /* stream1 */
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,100);
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(5);
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_next_stream_id=200;
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+
+ /* stream1 */
+
+ TrexStream * stream2 = new TrexStream(TrexStream::stSINGLE_BURST, 0,200);
+ stream2->set_pps(1.0);
+ stream2->set_single_burst(5);
+ stream2->m_enabled = true;
+ stream2->m_self_start = false;
+
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000002);
+ pcap.clone_packet_into_stream(stream2);
+ streams.push_back(stream2);
+
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ EXPECT_TRUE(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10.0 );
+
+
+ t1.m_msg = lpstart;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+ delete stream2 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+
+TEST_F(basic_stl, single_pkt_burst1) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_single_pkt_burst1";
+
+ TrexStreamsCompiler compile;
+
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,0);
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(5);
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10.0 );
+
+
+ t1.m_msg = lpstart;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+
+
+TEST_F(basic_stl, single_pkt) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_single_stream";
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+
+
+ t1.m_msg = lpstart;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+TEST_F(basic_stl, multi_pkt1) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_multi_pkt1";
+
+ TrexStreamsCompiler compile;
+
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ TrexStream * stream2 = new TrexStream(TrexStream::stCONTINUOUS,0,1);
+ stream2->set_pps(2.0);
+
+ stream2->m_enabled = true;
+ stream2->m_self_start = true;
+ stream2->m_isg_usec = 1000.0; /* 1 msec */
+ pcap.update_ip_src(0x20000001);
+ pcap.clone_packet_into_stream(stream2);
+
+ streams.push_back(stream2);
+
+
+ // stream - clean
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10 );
+
+ t1.m_msg = lpstart;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+ delete stream2 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+
+
+
+/* check disabled stream with multiplier of 5*/
+TEST_F(basic_stl, multi_pkt2) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_multi_pkt2";
+
+ TrexStreamsCompiler compile;
+
+
+ std::vector<TrexStream *> streams;
+
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+
+ TrexStream * stream2 = new TrexStream(TrexStream::stCONTINUOUS,0,1);
+ stream2->set_pps(2.0);
+
+ stream2->m_enabled = false;
+ stream2->m_self_start = false;
+ stream2->m_isg_usec = 1000.0; /* 1 msec */
+ pcap.update_ip_src(0x20000001);
+ pcap.clone_packet_into_stream(stream2);
+
+ streams.push_back(stream2);
+
+
+ // stream - clean
+ TrexStreamsCompiledObj comp_obj(0,5.0);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10 );
+
+ t1.m_msg = lpstart;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+ delete stream2 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+TEST_F(basic_stl, multi_burst1) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_multi_burst1";
+
+ TrexStreamsCompiler compile;
+
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stMULTI_BURST,0,0);
+ stream1->set_pps(1.0);
+ stream1->set_multi_burst(5,
+ 3,
+ 2000000.0);
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 40 );
+
+
+ t1.m_msg = lpstart;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+/********************************************* Itay Tests Start *************************************/
+
+/**
+ * check that continous stream does not point to another stream
+ * (makes no sense)
+ */
+TEST_F(basic_stl, compile_bad_1) {
+
+ TrexStreamsCompiler compile;
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,2);
+ stream1->m_enabled = true;
+ stream1->set_pps(52.0);
+ stream1->m_next_stream_id = 3;
+
+ streams.push_back(stream1);
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ std::string err_msg;
+ EXPECT_FALSE(compile.compile(streams, comp_obj, &err_msg));
+
+ delete stream1;
+
+}
+
+/**
+ * check for streams pointing to non exsistant streams
+ *
+ * @author imarom (16-Nov-15)
+ */
+TEST_F(basic_stl, compile_bad_2) {
+
+ TrexStreamsCompiler compile;
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST,0,1);
+ stream1->m_enabled = true;
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(200);
+
+ /* non existant next stream */
+ stream1->m_next_stream_id = 5;
+
+ TrexStream * stream2 = new TrexStream(TrexStream::stCONTINUOUS,0,2);
+ stream1->set_pps(52.0);
+
+ streams.push_back(stream1);
+ streams.push_back(stream2);
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ std::string err_msg;
+ EXPECT_FALSE(compile.compile(streams, comp_obj, &err_msg));
+
+ delete stream1;
+ delete stream2;
+
+}
+
+/**
+ * check for "dead streams" in the mesh
+ * a streams that cannot be reached
+ *
+ * @author imarom (16-Nov-15)
+ */
+TEST_F(basic_stl, compile_bad_3) {
+
+ TrexStreamsCompiler compile;
+ std::vector<TrexStream *> streams;
+ TrexStream *stream;
+
+ /* stream 1 */
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 231);
+ stream->m_enabled = true;
+ stream->set_pps(1.0);
+ stream->set_single_burst(200);
+
+ stream->m_next_stream_id = 5481;
+ stream->m_self_start = true;
+
+ streams.push_back(stream);
+
+ /* stream 2 */
+ stream = new TrexStream(TrexStream::stCONTINUOUS, 0, 5481);
+ stream->m_enabled = true;
+ stream->m_next_stream_id = -1;
+ stream->m_self_start = false;
+ stream->set_pps(52.0);
+
+ streams.push_back(stream);
+
+ /* stream 3 */
+
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 1928);
+ stream->m_enabled = true;
+ stream->set_pps(1.0);
+ stream->set_single_burst(200);
+
+ stream->m_next_stream_id = -1;
+ stream->m_self_start = true;
+
+ streams.push_back(stream);
+
+ /* stream 4 */
+
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 41231);
+ stream->m_enabled = true;
+ stream->set_pps(1.0);
+ stream->set_single_burst(200);
+
+ stream->m_next_stream_id = 3928;
+ stream->m_self_start = false;
+
+ streams.push_back(stream);
+
+ /* stream 5 */
+
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 3928);
+ stream->m_enabled = true;
+ stream->set_pps(1.0);
+ stream->set_single_burst(200);
+
+ stream->m_next_stream_id = 41231;
+ stream->m_self_start = false;
+
+ streams.push_back(stream);
+
+ /* compile */
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ std::string err_msg;
+ EXPECT_FALSE(compile.compile(streams, comp_obj, &err_msg));
+
+ for (auto stream : streams) {
+ delete stream;
+ }
+
+}
+
+TEST_F(basic_stl, compile_with_warnings) {
+
+ TrexStreamsCompiler compile;
+ std::vector<TrexStream *> streams;
+ TrexStream *stream;
+
+ /* stream 1 */
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 231);
+ stream->m_enabled = true;
+ stream->set_pps(1.0);
+ stream->set_single_burst(200);
+
+ stream->m_next_stream_id = 1928;
+ stream->m_self_start = true;
+
+ streams.push_back(stream);
+
+ /* stream 2 */
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 5481);
+ stream->m_enabled = true;
+ stream->m_next_stream_id = 1928;
+ stream->m_self_start = true;
+ stream->set_pps(52.0);
+
+ streams.push_back(stream);
+
+ /* stream 3 */
+
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 1928);
+ stream->m_enabled = true;
+ stream->set_pps(1.0);
+ stream->set_single_burst(200);
+
+ stream->m_next_stream_id = -1;
+ stream->m_self_start = true;
+
+ streams.push_back(stream);
+
+
+
+ /* compile */
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ std::string err_msg;
+ EXPECT_TRUE(compile.compile(streams, comp_obj, &err_msg));
+
+ EXPECT_TRUE(compile.get_last_compile_warnings().size() == 1);
+
+ for (auto stream : streams) {
+ delete stream;
+ }
+
+}
+
+
+TEST_F(basic_stl, compile_good_stream_id_compres) {
+
+ TrexStreamsCompiler compile;
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST,0,700);
+ stream1->m_self_start = true;
+ stream1->m_enabled = true;
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(200);
+
+ /* non existant next stream */
+ stream1->m_next_stream_id = 800;
+
+
+ TrexStream * stream2 = new TrexStream(TrexStream::stSINGLE_BURST,0,800);
+ stream2->set_pps(52.0);
+ stream2->m_enabled = true;
+ stream2->m_next_stream_id = 700;
+ stream2->set_single_burst(300);
+
+
+ streams.push_back(stream1);
+ streams.push_back(stream2);
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ std::string err_msg;
+ EXPECT_TRUE(compile.compile(streams, comp_obj, &err_msg));
+
+ printf(" %s \n",err_msg.c_str());
+
+ comp_obj.Dump(stdout);
+
+ EXPECT_EQ_UINT32(comp_obj.get_objects()[0].m_stream->m_stream_id,0);
+ EXPECT_EQ_UINT32(comp_obj.get_objects()[0].m_stream->m_next_stream_id,1);
+
+ EXPECT_EQ_UINT32(comp_obj.get_objects()[1].m_stream->m_stream_id,1);
+ EXPECT_EQ_UINT32(comp_obj.get_objects()[1].m_stream->m_next_stream_id,0);
+
+ delete stream1;
+ delete stream2;
+
+}
+
+
+
+class DpToCpHandlerStopEvent: public DpToCpHandler {
public:
+ DpToCpHandlerStopEvent(int event_id) {
+ m_event_id = event_id;
+ }
+
+ virtual void handle(TrexStatelessDpToCpMsgBase *msg) {
+ /* first the message must be an event */
+ TrexDpPortEventMsg *event = dynamic_cast<TrexDpPortEventMsg *>(msg);
+ EXPECT_TRUE(event != NULL);
+ EXPECT_TRUE(event->get_event_type() == TrexDpPortEvent::EVENT_STOP);
+
+ EXPECT_TRUE(event->get_event_id() == m_event_id);
+ EXPECT_TRUE(event->get_port_id() == 0);
+
+ }
+
+private:
+ int m_event_id;
};
+TEST_F(basic_stl, dp_stop_event) {
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/ignore";
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST,0,0);
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(100);
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
-TEST_F(dp_sl_basic, test1) {
- CTRexDpStatelessStream s1;
- s1.set_enable(true);
- s1.set_self_start(true);
- s1.set_inter_stream_gap(0.77);
- s1.get_mode().set_mode(CTRexDpStreamMode::moCONTINUES);
- s1.get_mode().cont().set_pps(100.2);
- s1.dump(stdout);
-}
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(port_id, 17, comp_obj.clone(), 10.0 /*sec */ );
+
+
+ t1.m_msg = lpstart;
+
+ /* let me handle these */
+ DpToCpHandlerStopEvent handler(17);
+ t1.m_dp_to_cp_handler = &handler;
+
+ bool res=t1.init();
+ EXPECT_EQ_UINT32(1, res?1:0);
+
+ delete stream1 ;
+
+}
-#endif
+/********************************************* Itay Tests End *************************************/
diff --git a/src/gtest/tuple_gen_test.cpp b/src/gtest/tuple_gen_test.cpp
index 8a774e38..f3b9fa1e 100755
--- a/src/gtest/tuple_gen_test.cpp
+++ b/src/gtest/tuple_gen_test.cpp
@@ -161,7 +161,6 @@ TEST(tuple_gen,clientPoolL) {
0,0);
CTupleBase result;
uint32_t result_src;
- uint32_t result_dest;
uint16_t result_port;
for(int i=0;i<10;i++) {
@@ -186,7 +185,6 @@ TEST(tuple_gen,clientPool) {
0,0);
CTupleBase result;
uint32_t result_src;
- uint32_t result_dest;
uint16_t result_port;
for(int i=0;i<10;i++) {
@@ -436,7 +434,6 @@ TEST(tuple_gen,template1) {
template_1.GenerateTuple(result);
uint32_t result_src = result.getClient();
uint32_t result_dest = result.getServer();
- uint16_t result_port = result.getClientPort();
//printf(" %x %x %x \n",result_src,result_dest,result_port);
EXPECT_EQ(result_src, (uint32_t)(0x10000001+i));
EXPECT_EQ(result_dest, (uint32_t)(((0x12121212)) ));
@@ -489,9 +486,6 @@ TEST(tuple_gen,no_free) {
int i;
for (i=0; i<65557; i++) {
template_1.GenerateTuple(result);
- uint32_t result_src = result.getClient();
- uint32_t result_dest = result.getServer();
- uint16_t result_port = result.getClientPort();
}
// should have error
EXPECT_TRUE((gen.getErrorAllocationCounter()>0)?true:false);
@@ -514,8 +508,6 @@ TEST(tuple_gen,try_to_free) {
int i;
for (i=0; i<65557; i++) {
template_1.GenerateTuple(result);
- uint32_t result_src = result.getClient();
- uint32_t result_dest = result.getServer();
uint16_t result_port = result.getClientPort();
gen.FreePort(0,result.getClientId(),result_port);
}
diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h
index 5c2d42d2..5890a965 100644
--- a/src/internal_api/trex_platform_api.h
+++ b/src/internal_api/trex_platform_api.h
@@ -23,6 +23,7 @@ limitations under the License.
#define __TREX_PLATFORM_API_H__
#include <stdint.h>
+#include <vector>
/**
* Global stats
@@ -96,6 +97,7 @@ public:
class TrexPlatformApi {
public:
+ virtual void port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const = 0;
virtual void get_global_stats(TrexPlatformGlobalStats &stats) const = 0;
virtual void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const = 0;
virtual uint8_t get_dp_core_count() const = 0;
@@ -110,6 +112,7 @@ public:
*/
class TrexDpdkPlatformApi : public TrexPlatformApi {
public:
+ void port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const;
void get_global_stats(TrexPlatformGlobalStats &stats) const;
void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const;
uint8_t get_dp_core_count() const;
@@ -122,6 +125,7 @@ public:
*/
class TrexMockPlatformApi : public TrexPlatformApi {
public:
+ void port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const {}
void get_global_stats(TrexPlatformGlobalStats &stats) const;
void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const;
uint8_t get_dp_core_count() const;
diff --git a/src/main.cpp b/src/main.cpp
index bd64c5a4..b633fce6 100755
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -26,6 +26,8 @@ limitations under the License.
#include <common/arg/SimpleGlob.h>
#include <common/arg/SimpleOpt.h>
+#include <stateless/cp/trex_stateless.h>
+
// An enum for all the option types
enum { OPT_HELP, OPT_CFG, OPT_NODE_DUMP, OP_STATS,
@@ -94,21 +96,20 @@ static int usage(){
int gtest_main(int argc, char **argv) ;
-static int parse_options(int argc, char *argv[], CParserOption* po ) {
+static int parse_options(int argc, char *argv[], CParserOption* po, bool & is_gtest ) {
CSimpleOpt args(argc, argv, parser_options);
int a=0;
int node_dump=0;
po->preview.clean();
po->preview.setFileWrite(true);
- int res1;
while ( args.Next() ){
if (args.LastError() == SO_SUCCESS) {
switch (args.OptionId()) {
case OPT_UT :
- res1=gtest_main(argc, argv);
- exit(res1);
+ is_gtest=true;
+ return (0);
break;
case OPT_HELP:
usage();
@@ -214,11 +215,12 @@ void * thread_task(void *info){
char buf[100];
sprintf(buf,"my%d.erf",obj->thread_id);
- volatile int i;
lpt->start_generate_stateful(buf,*obj->preview_info);
lpt->m_node_gen.DumpHist(stdout);
printf("end thread %d \n",obj->thread_id);
}
+
+ return (NULL);
}
@@ -405,8 +407,6 @@ void update_tcp_seq_num(CCapFileFlowInfo * obj,
int i;
for (i=pkt_id+1; i<s; i++) {
- uint32_t seq;
- uint32_t ack;
pkt=obj->GetPacket(i);
tcp=pkt->m_pkt_indication.l4.m_tcp;
@@ -490,7 +490,7 @@ int manipolate_capfile() {
CCapFileFlowInfo flow_info;
flow_info.Create();
- int res=flow_info.load_cap_file("avl/delay_10_rtsp_0.pcap",0,0);
+ flow_info.load_cap_file("avl/delay_10_rtsp_0.pcap",0,0);
change_pkt_len(&flow_info,4-1 ,6);
change_pkt_len(&flow_info,5-1 ,6);
@@ -515,7 +515,7 @@ int manipolate_capfile_sip() {
CCapFileFlowInfo flow_info;
flow_info.Create();
- int res=flow_info.load_cap_file("avl/delay_10_sip_0.pcap",0,0);
+ flow_info.load_cap_file("avl/delay_10_sip_0.pcap",0,0);
change_pkt_len(&flow_info,1-1 ,6+6);
change_pkt_len(&flow_info,2-1 ,6+6);
@@ -532,8 +532,8 @@ int manipolate_capfile_sip1() {
CCapFileFlowInfo flow_info;
flow_info.Create();
- int res=flow_info.load_cap_file("avl/delay_sip_0.pcap",0,0);
- CFlowPktInfo * pkt=flow_info.GetPacket(1);
+ flow_info.load_cap_file("avl/delay_sip_0.pcap",0,0);
+ flow_info.GetPacket(1);
change_pkt_len(&flow_info,1-1 ,6+6+10);
@@ -569,7 +569,7 @@ public:
void CMergeCapFileRec::Dump(FILE *fd,int _id){
- double time;
+ double time = 0.0;
bool stop=GetCurPacket(time);
fprintf (fd," id:%2d stop : %d index:%4d %3.4f \n",_id,stop?1:0,m_index,time);
}
@@ -620,6 +620,8 @@ bool CMergeCapFileRec::Create(std::string cap_file,
m_limit_number_of_packets =0;
m_start_time = pkt->m_packet->get_time() ;
m_offset = offset;
+
+ return (true);
}
@@ -663,12 +665,12 @@ bool CMergeCapFile::run_merge(std::string to_cap_file){
int min_index=0;
double min_time;
- fprintf(stdout," --------------\n",cnt);
+ fprintf(stdout," --------------\n");
fprintf(stdout," pkt : %d \n",cnt);
for (i=0; i<MERGE_CAP_FILES; i++) {
m[i].Dump(stdout,i);
}
- fprintf(stdout," --------------\n",cnt);
+ fprintf(stdout," --------------\n");
bool valid = false;
for (i=0; i<MERGE_CAP_FILES; i++) {
@@ -702,6 +704,8 @@ bool CMergeCapFile::run_merge(std::string to_cap_file){
};
m_results.save_to_erf(to_cap_file,1);
+
+ return (true);
}
@@ -746,18 +750,51 @@ int merge_2_cap_files_sip() {
return (0);
}
+static TrexStateless *g_trex_stateless;
+
+
+TrexStateless * get_stateless_obj() {
+ return g_trex_stateless;
+}
+
+extern "C" const char * get_build_date(void){
+ return (__DATE__);
+}
+
+extern "C" const char * get_build_time(void){
+ return (__TIME__ );
+}
+
+
+
int main(int argc , char * argv[]){
+ int res=0;
time_init();
CGlobalInfo::m_socket.Create(0);
-
CGlobalInfo::init_pools(1000);
assert( CMsgIns::Ins()->Create(4) );
- if ( parse_options(argc, argv, &CGlobalInfo::m_options ) != 0){
+
+ bool is_gtest=false;
+
+ if ( parse_options(argc, argv, &CGlobalInfo::m_options , is_gtest) != 0){
exit(-1);
}
- return (load_list_of_cap_files(&CGlobalInfo::m_options));
+
+ if ( is_gtest ) {
+ res = gtest_main(argc, argv);
+ }else{
+ res = load_list_of_cap_files(&CGlobalInfo::m_options);
+ }
+
+ CMsgIns::Ins()->Free();
+ CGlobalInfo::free_pools();
+ CGlobalInfo::m_socket.Delete();
+
+
+ return (res);
+
}
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index d4e07ef2..b1c9ed12 100755
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -58,6 +58,8 @@ limitations under the License.
#include <stateless/cp/trex_stateless.h>
#include <stateless/dp/trex_stream_node.h>
+#include <publisher/trex_publisher.h>
+#include <stateless/messaging/trex_stateless_messaging.h>
#include <../linux_dpdk/version.h>
@@ -551,19 +553,17 @@ static int usage(){
printf(" --mac [file] : YAML file with <client ip, mac addr> configuration \n");
printf(" \n\n");
- printf(" -r : realtime enable \n");
- printf(" \n\n");
- printf(" -c [number of cores] : 1 ,2,3,4,5 numnber of dual cores + master 1 means 1 master and 2 cores \n");
+ printf(" -c [number of threads] : default is 1. number of threads to allocate for each dual ports. \n");
printf(" \n");
- printf(" -s : run only one data path core\n");
+ printf(" -s : run only one data path core. for debug\n");
printf(" \n");
- printf(" --flip : flow will be sent from client->server and server->client for maximum throughput \n");
+ printf(" --flip : flow will be sent from client->server and server->client for maximum throughput \n");
printf(" \n");
- printf(" -p : flow-flip , send all packets flow from the same interface base of client ip \n");
+ printf(" -p : flow-flip , send all flow packets from the same interface base of client ip \n");
printf(" -e : like -p but comply to the generator rules \n");
printf(" \n");
- printf(" -l [pkt/sec] : run laterncy daemon in this rate \n");
+ printf(" -l [pkt/sec] : run latency daemon in this rate \n");
printf(" e.g -l 1000 run 1000 pkt/sec from each interface , zero mean to disable latency check \n");
printf(" --lm : latency mask \n");
printf(" 0x1 only port 0 will send traffic \n");
@@ -571,20 +571,18 @@ static int usage(){
printf(" \n");
- printf(" --limit-ports : limit number of ports , must be even e.g 2,4 \n");
+ printf(" --limit-ports : limit number of ports, must be even e.g. 2,4 \n");
printf(" \n");
- printf(" --nc : if set will not close all the flow , faster \n");
+ printf(" --nc : If set, will not wait for all the flows to be closed, terminate faster- see manual for more information \n");
printf(" \n");
- printf(" -d : duration of the test in sec \n");
+ printf(" -d : duration of the test in sec. look for --nc \n");
printf(" \n");
- printf(" -pm : platform factor , in case you have splitter in the setup you can multiply the total results in this factor \n");
+ printf(" -pm : platform factor ,in case you have splitter in the setup you can multiply the total results in this factor \n");
printf(" e.g --pm 2.0 will multiply all the results bps in this factor \n");
printf(" \n");
printf(" -pubd : disable monitors publishers \n");
- printf(" -m : factor of bandwidth \n");
- printf(" \n");
- printf(" -1g : 1G trex \n");
+ printf(" -m : factor of bandwidth \n");
printf(" \n");
printf(" -k [sec] : run latency test before starting the test. it will wait for x sec sending packet and x sec after that \n");
printf(" \n");
@@ -594,7 +592,7 @@ static int usage(){
printf(" you can copy this file to /etc/trex_cfg.yaml \n");
printf(" \n");
- printf(" --ipv6 : work in ipv6 mode \n");
+ printf(" --ipv6 : work in ipv6 mode\n");
printf(" --learn : Work in NAT environments, learn the dynamic NAT translation and ALG \n");
printf(" --learn-verify : Learn the translation, but intended for verification of the mechanism in cases that NAT does not exist \n");
@@ -609,17 +607,17 @@ static int usage(){
printf(" Warning : This program can generate huge-files (TB ) watch out! try this only on local drive \n");
printf(" \n");
printf(" \n");
- printf(" --rx-check [sample] : enable rx check thread , using this thread we sample flows 1/sample and check order,latency and more \n");
+ printf(" --rx-check [sample] : enable rx check thread, using this thread we sample flows 1/sample and check order,latency and more \n");
printf(" this feature consume another thread \n");
printf(" \n");
- printf(" --hops [hops] : If rx check is enabled, the hop number can be assigned. The default number of hops is 1\n");
- printf(" --iom [mode] : io mode for interactive mode [0- silent, 1- normal , 2- short] \n");
+ printf(" --hops [hops] : If rx check is enabled, the hop number can be assigned. The default number of hops is 1\n");
+ printf(" --iom [mode] : io mode for interactive mode [0- silent, 1- normal , 2- short] \n");
printf(" this feature consume another thread \n");
printf(" \n");
- printf(" --no-key : daemon mode, don't get input from keyboard \n");
- printf(" --no-flow-control : In default TRex disables flow-control using this flag it does not touch it \n");
- printf(" --prefix : for multi trex, each instance should have a different name \n");
- printf(" --mac-spread : Spread the destination mac-order by this factor. e.g 2 will generate the traffic to 2 devices DEST-MAC ,DEST-MAC+1 \n");
+ printf(" --no-key : daemon mode, don't get input from keyboard \n");
+ printf(" --no-flow-control : In default TRex disables flow-control using this flag it does not touch it \n");
+ printf(" --prefix : for multi trex, each instance should have a different name \n");
+ printf(" --mac-spread : Spread the destination mac-order by this factor. e.g 2 will generate the traffic to 2 devices DEST-MAC ,DEST-MAC+1 \n");
printf(" maximum is up to 128 devices \n");
@@ -630,7 +628,7 @@ static int usage(){
printf(" \n");
printf(" -o [capfile_name] simulate trex into pcap file \n");
printf(" --pcap export the file in pcap mode \n");
- printf(" t-rex-64 -d 10 -f cfg.yaml -o my.pcap --pcap # export 10 sec of what Trex will do on real-time to a file my.pcap \n");
+ printf(" bp-sim-64 -d 10 -f cfg.yaml -o my.pcap --pcap # export 10 sec of what Trex will do on real-time to a file my.pcap \n");
printf(" --vm-sim : simulate vm with driver of one input queue and one output queue \n");
printf(" \n");
printf(" Examples: ");
@@ -688,12 +686,13 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t
CSimpleOpt args(argc, argv, parser_options);
bool latency_was_set=false;
+ (void)latency_was_set;
+
int a=0;
int node_dump=0;
po->preview.setFileWrite(true);
po->preview.setRealTime(true);
- int res1;
uint32_t tmp_data;
po->m_run_mode = CParserOption::RUN_MODE_INVALID;
@@ -891,7 +890,7 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t
}
if (po->preview.get_is_rx_check_enable() && ( po->is_latency_disabled() ) ) {
- printf(" rx check must be enable with latency check. try adding '-l 1000' \n");
+ printf(" rx check must be enabled with latency check. try adding '-l 1000' \n");
return -1;
}
@@ -904,7 +903,7 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t
uint32_t cores=po->preview.getCores();
if ( cores > ((BP_MAX_CORES)/2-1) ) {
- printf(" ERROR maximum cores are : %d \n",((BP_MAX_CORES)/2-1));
+ printf(" ERROR maximum supported cores are : %d \n",((BP_MAX_CORES)/2-1));
return -1;
}
@@ -951,9 +950,9 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t
int main_test(int argc , char * argv[]);
-static const char * default_argv[] = {"xx","-c", "0x7", "-n","2","-b","0000:0b:01.01"};
-static int argv_num = 7;
-
+//static const char * default_argv[] = {"xx","-c", "0x7", "-n","2","-b","0000:0b:01.01"};
+//static int argv_num = 7;
+
#define RX_PTHRESH 8 /**< Default values of RX prefetch threshold reg. */
@@ -1137,8 +1136,8 @@ void CPhyEthIFStats::Clear(){
void CPhyEthIFStats::DumpAll(FILE *fd){
- #define DP_A4(f) printf(" %-40s : %llu \n",#f,f)
- #define DP_A(f) if (f) printf(" %-40s : %llu \n",#f,f)
+ #define DP_A4(f) printf(" %-40s : %llu \n",#f, (unsigned long long)f)
+ #define DP_A(f) if (f) printf(" %-40s : %llu \n",#f, (unsigned long long)f)
DP_A4(opackets);
DP_A4(obytes);
DP_A4(ipackets);
@@ -1177,7 +1176,7 @@ public:
m_port_id = portid;
m_last_rx_rate = 0.0;
m_last_tx_rate = 0.0;
- m_last_pps=0.0;
+ m_last_tx_pps = 0.0;
return (true);
}
void Delete();
@@ -1253,8 +1252,12 @@ public:
return (m_last_rx_rate);
}
- float get_last_pps_rate(){
- return (m_last_pps);
+ float get_last_tx_pps_rate(){
+ return (m_last_tx_pps);
+ }
+
+ float get_last_rx_pps_rate(){
+ return (m_last_rx_pps);
}
CPhyEthIFStats & get_stats(){
@@ -1307,12 +1310,14 @@ private:
CBwMeasure m_bw_tx;
CBwMeasure m_bw_rx;
CPPSMeasure m_pps_tx;
+ CPPSMeasure m_pps_rx;
CPhyEthIFStats m_stats;
- float m_last_rx_rate;
float m_last_tx_rate;
- float m_last_pps;
+ float m_last_rx_rate;
+ float m_last_tx_pps;
+ float m_last_rx_pps;
public:
struct rte_eth_dev_info m_dev_info;
};
@@ -1607,10 +1612,6 @@ void CPhyEthIF::macaddr_get(struct ether_addr *mac_addr){
void CPhyEthIF::get_stats_1g(CPhyEthIFStats *stats){
- int i;
- uint64_t t=0;
-
-
stats->ipackets += pci_reg_read(E1000_GPRC) ;
stats->ibytes += (pci_reg_read(E1000_GORCL) );
@@ -1648,7 +1649,8 @@ void CPhyEthIF::get_stats_1g(CPhyEthIFStats *stats){
m_last_tx_rate = m_bw_tx.add(stats->obytes);
m_last_rx_rate = m_bw_rx.add(stats->ibytes);
- m_last_pps = m_pps_tx.add(stats->opackets);
+ m_last_tx_pps = m_pps_tx.add(stats->opackets);
+ m_last_rx_pps = m_pps_rx.add(stats->ipackets);
}
@@ -1658,14 +1660,15 @@ void CPhyEthIF::get_stats(CPhyEthIFStats *stats){
m_last_tx_rate = m_bw_tx.add(stats->obytes);
m_last_rx_rate = m_bw_rx.add(stats->ibytes);
- m_last_pps = m_pps_tx.add(stats->opackets);
+ m_last_tx_pps = m_pps_tx.add(stats->opackets);
+ m_last_rx_pps = m_pps_rx.add(stats->ipackets);
}
void dump_hw_state(FILE *fd,struct ixgbe_hw_stats *hs ){
- #define DP_A1(f) if (hs->f) fprintf(fd," %-40s : %llu \n",#f,hs->f)
- #define DP_A2(f,m) for (i=0;i<m; i++) { if (hs->f[i]) fprintf(fd," %-40s[%d] : %llu \n",#f,i,hs->f[i]); }
+ #define DP_A1(f) if (hs->f) fprintf(fd," %-40s : %llu \n",#f, (unsigned long long)hs->f)
+ #define DP_A2(f,m) for (i=0;i<m; i++) { if (hs->f[i]) fprintf(fd," %-40s[%d] : %llu \n",#f,i, (unsigned long long)hs->f[i]); }
int i;
//for (i=0;i<8; i++) { if (hs->mpc[i]) fprintf(fd," %-40s[%d] : %llu \n","mpc",i,hs->mpc[i]); }
@@ -1849,6 +1852,7 @@ public:
virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m);
+ virtual pkt_dir_t port_id_to_dir(uint8_t port_id);
public:
void GetCoreCounters(CVirtualIFPerSideStats *stats);
@@ -1861,6 +1865,10 @@ public:
return ( CGlobalInfo::m_socket.port_to_socket( m_ports[0].m_port->get_port_id() ) );
}
+ const CCorePerPort * get_ports() {
+ return m_ports;
+ }
+
protected:
int send_burst(CCorePerPort * lp_port,
@@ -2087,6 +2095,8 @@ int CCoreEthIF::send_burst(CCorePerPort * lp_port,
rte_pktmbuf_free(m);
}
}
+
+ return (0);
}
@@ -2107,6 +2117,8 @@ int CCoreEthIF::send_pkt(CCorePerPort * lp_port,
len = 0;
}
lp_port->m_len = len;
+
+ return (0);
}
@@ -2265,7 +2277,17 @@ int CCoreEthIF::update_mac_addr_from_global_cfg(pkt_dir_t dir,
return (0);
}
+pkt_dir_t
+CCoreEthIF::port_id_to_dir(uint8_t port_id) {
+
+ for (pkt_dir_t dir = 0; dir < CS_NUM; dir++) {
+ if (m_ports[dir].m_port->get_port_id() == port_id) {
+ return dir;
+ }
+ }
+ return (CS_INVALID);
+}
class CLatencyHWPort : public CPortLatencyHWBase {
public:
@@ -2377,71 +2399,6 @@ private:
};
-class CZMqPublisher {
-public:
- CZMqPublisher(){
- m_context=0;
- m_publisher=0;
- }
-
- bool Create(uint16_t port,bool disable);
- void Delete();
- void publish_json(std::string & s);
-private:
- void show_zmq_last_error(char *s);
-private:
- void * m_context;
- void * m_publisher;
-};
-
-void CZMqPublisher::show_zmq_last_error(char *s){
- printf(" ERROR %s \n",s);
- printf(" ZMQ: %s",zmq_strerror (zmq_errno ()));
- exit(-1);
-}
-
-
-bool CZMqPublisher::Create(uint16_t port,bool disable){
-
- if (disable) {
- return(true);
- }
- m_context = zmq_ctx_new ();
- if ( m_context == 0 ) {
- show_zmq_last_error((char *)"can't connect to ZMQ library");
- }
- m_publisher = zmq_socket (m_context, ZMQ_PUB);
- if ( m_context == 0 ) {
- show_zmq_last_error((char *)"can't create ZMQ socket");
- }
- char buffer[100];
- sprintf(buffer,"tcp://*:%d",port);
- int rc=zmq_bind (m_publisher, buffer);
- if (rc != 0 ) {
- sprintf(buffer,"can't bind to ZMQ socket %d",port);
- show_zmq_last_error(buffer);
- }
- printf("zmq publisher at: %s \n",buffer);
- return (true);
-}
-
-
-void CZMqPublisher::Delete(){
- if (m_publisher) {
- zmq_close (m_publisher);
- }
- if (m_context) {
- zmq_ctx_destroy (m_context);
- }
-}
-
-
-void CZMqPublisher::publish_json(std::string & s){
- if ( m_publisher ){
- int size = zmq_send (m_publisher, s.c_str(), s.length(), 0);
- assert(size==s.length());
- }
-}
class CPerPortStats {
public:
@@ -2453,6 +2410,10 @@ public:
uint64_t oerrors;
float m_total_tx_bps;
+ float m_total_tx_pps;
+
+ float m_total_rx_bps;
+ float m_total_rx_pps;
};
class CGlobalStats {
@@ -2489,6 +2450,7 @@ public:
float m_tx_bps;
float m_rx_bps;
float m_tx_pps;
+ float m_rx_pps;
float m_tx_cps;
float m_tx_expected_cps;
float m_tx_expected_pps;
@@ -2521,7 +2483,7 @@ std::string CGlobalStats::get_field(std::string name,float &f){
std::string CGlobalStats::get_field(std::string name,uint64_t &f){
char buff[200];
- sprintf(buff,"\"%s\":%llu,",name.c_str(),f);
+ sprintf(buff,"\"%s\":%llu,",name.c_str(), (unsigned long long)f);
return (std::string(buff));
}
@@ -2533,7 +2495,7 @@ std::string CGlobalStats::get_field_port(int port,std::string name,float &f){
std::string CGlobalStats::get_field_port(int port,std::string name,uint64_t &f){
char buff[200];
- sprintf(buff,"\"%s-%d\":%llu,",name.c_str(),port,f);
+ sprintf(buff,"\"%s-%d\":%llu,",name.c_str(),port, (unsigned long long)f);
return (std::string(buff));
}
@@ -2549,6 +2511,7 @@ void CGlobalStats::dump_json(std::string & json){
json+=GET_FIELD(m_tx_bps);
json+=GET_FIELD(m_rx_bps);
json+=GET_FIELD(m_tx_pps);
+ json+=GET_FIELD(m_rx_pps);
json+=GET_FIELD(m_tx_cps);
json+=GET_FIELD(m_tx_expected_cps);
json+=GET_FIELD(m_tx_expected_pps);
@@ -2583,6 +2546,9 @@ void CGlobalStats::dump_json(std::string & json){
json+=GET_FIELD_PORT(i,ierrors) ;
json+=GET_FIELD_PORT(i,oerrors) ;
json+=GET_FIELD_PORT(i,m_total_tx_bps);
+ json+=GET_FIELD_PORT(i,m_total_tx_pps);
+ json+=GET_FIELD_PORT(i,m_total_rx_bps);
+ json+=GET_FIELD_PORT(i,m_total_rx_pps);
}
json+=m_template.dump_as_json("template");
json+="\"unknown\":0}}" ;
@@ -2602,7 +2568,7 @@ void CGlobalStats::DumpAllPorts(FILE *fd){
fprintf (fd," Platform_factor : %2.1f \n",m_platform_factor);
fprintf (fd," Total-Tx : %s ",double_to_human_str(m_tx_bps,"bps",KBYE_1000).c_str());
if ( CGlobalInfo::is_learn_mode() ) {
- fprintf (fd," Nat_time_out : %8llu \n",m_total_nat_time_out);
+ fprintf (fd," Nat_time_out : %8llu \n", (unsigned long long)m_total_nat_time_out);
}else{
fprintf (fd,"\n");
}
@@ -2610,49 +2576,52 @@ void CGlobalStats::DumpAllPorts(FILE *fd){
fprintf (fd," Total-Rx : %s ",double_to_human_str(m_rx_bps,"bps",KBYE_1000).c_str());
if ( CGlobalInfo::is_learn_mode() ) {
- fprintf (fd," Nat_no_fid : %8llu \n",m_total_nat_no_fid);
+ fprintf (fd," Nat_no_fid : %8llu \n", (unsigned long long)m_total_nat_no_fid);
}else{
fprintf (fd,"\n");
}
fprintf (fd," Total-PPS : %s ",double_to_human_str(m_tx_pps,"pps",KBYE_1000).c_str());
if ( CGlobalInfo::is_learn_mode() ) {
- fprintf (fd," Total_nat_active: %8llu \n",m_total_nat_active);
+ fprintf (fd," Total_nat_active: %8llu \n", (unsigned long long)m_total_nat_active);
}else{
fprintf (fd,"\n");
}
fprintf (fd," Total-CPS : %s ",double_to_human_str(m_tx_cps,"cps",KBYE_1000).c_str());
if ( CGlobalInfo::is_learn_mode() ) {
- fprintf (fd," Total_nat_open : %8llu \n",m_total_nat_open);
+ fprintf (fd," Total_nat_open : %8llu \n", (unsigned long long)m_total_nat_open);
}else{
fprintf (fd,"\n");
}
fprintf (fd,"\n");
fprintf (fd," Expected-PPS : %s ",double_to_human_str(m_tx_expected_pps,"pps",KBYE_1000).c_str());
if ( CGlobalInfo::is_learn_verify_mode() ) {
- fprintf (fd," Nat_learn_errors: %8llu \n",m_total_nat_learn_error);
+ fprintf (fd," Nat_learn_errors: %8llu \n", (unsigned long long)m_total_nat_learn_error);
}else{
fprintf (fd,"\n");
}
fprintf (fd," Expected-CPS : %s \n",double_to_human_str(m_tx_expected_cps,"cps",KBYE_1000).c_str());
fprintf (fd," Expected-BPS : %s \n",double_to_human_str(m_tx_expected_bps,"bps",KBYE_1000).c_str());
fprintf (fd,"\n");
- fprintf (fd," Active-flows : %8llu Clients : %8llu Socket-util : %3.4f %% \n",(uint64_t)m_active_flows,m_total_clients,m_socket_util);
+ fprintf (fd," Active-flows : %8llu Clients : %8llu Socket-util : %3.4f %% \n",
+ (unsigned long long)m_active_flows,
+ (unsigned long long)m_total_clients,
+ m_socket_util);
fprintf (fd," Open-flows : %8llu Servers : %8llu Socket : %8llu Socket/Clients : %.1f \n",
- (uint64_t)m_open_flows,
- m_total_servers,
- m_active_sockets,
+ (unsigned long long)m_open_flows,
+ (unsigned long long)m_total_servers,
+ (unsigned long long)m_active_sockets,
(float)m_active_sockets/(float)m_total_clients);
if (m_total_alloc_error) {
- fprintf (fd," Total_alloc_err : %llu \n",(uint64_t)m_total_alloc_error);
+ fprintf (fd," Total_alloc_err : %llu \n", (unsigned long long)m_total_alloc_error);
}
if ( m_total_queue_full ){
- fprintf (fd," Total_queue_full : %llu \n",(uint64_t)m_total_queue_full);
+ fprintf (fd," Total_queue_full : %llu \n", (unsigned long long)m_total_queue_full);
}
if (m_total_queue_drop) {
- fprintf (fd," Total_queue_drop : %llu \n",(uint64_t)m_total_queue_drop);
+ fprintf (fd," Total_queue_drop : %llu \n", (unsigned long long)m_total_queue_drop);
}
//m_template.Dump(fd);
@@ -2676,8 +2645,8 @@ void CGlobalStats::Dump(FILE *fd,DumpFormat mode){
CPerPortStats * lp=&m_port[i];
fprintf(fd,"port : %d \n",(int)i);
fprintf(fd,"------------\n");
- #define GS_DP_A4(f) fprintf(fd," %-40s : %llu \n",#f,lp->f)
- #define GS_DP_A(f) if (lp->f) fprintf(fd," %-40s : %llu \n",#f,lp->f)
+ #define GS_DP_A4(f) fprintf(fd," %-40s : %llu \n",#f, (unsigned long long)lp->f)
+ #define GS_DP_A(f) if (lp->f) fprintf(fd," %-40s : %llu \n",#f, (unsigned long long)lp->f)
GS_DP_A4(opackets);
GS_DP_A4(obytes);
GS_DP_A4(ipackets);
@@ -2785,8 +2754,19 @@ public:
int reset_counters();
+public:
+
+private:
+ /* try to stop all datapath cores */
+ void try_stop_all_dp();
+ /* send message to all dp cores */
+ int send_message_all_dp(TrexStatelessCpToDpMsgBase *msg);
+
+ void check_for_dp_message_from_core(int thread_id);
+ void check_for_dp_messages();
public:
+
int start_send_master();
int start_master_stateless();
@@ -2925,7 +2905,7 @@ private:
CLatencyVmPort m_latency_vm_vports[BP_MAX_PORTS]; /* vm driver */
CLatencyPktInfo m_latency_pkt;
- CZMqPublisher m_zmq_publisher;
+ TrexPublisher m_zmq_publisher;
public:
TrexStateless *m_trex_stateless;
@@ -2966,13 +2946,13 @@ int CGlobalTRex::rcv_send_all(int queue_id){
int CGlobalTRex::test_send(){
int i;
- CPhyEthIF * lp=&m_ports[0];
-
//set_promisc_all(true);
//create_sctp_pkt();
create_udp_pkt();
CRx_check_header rx_check_header;
+ (void)rx_check_header;
+
rx_check_header.m_time_stamp=0x1234567;
rx_check_header.m_option_type=RX_CHECK_V4_OPT_TYPE;
rx_check_header.m_option_len=RX_CHECK_V4_OPT_LEN;
@@ -3046,7 +3026,7 @@ int CGlobalTRex::test_send(){
}*/
#endif
- fprintf(stdout," drop : %llu \n",m_test_drop);
+ fprintf(stdout," drop : %llu \n", (unsigned long long)m_test_drop);
return (0);
}
@@ -3177,6 +3157,8 @@ int CGlobalTRex::set_promisc_all(bool enable){
CPhyEthIF * _if=&m_ports[i];
_if->set_promiscuous(enable);
}
+
+ return (0);
}
@@ -3187,8 +3169,52 @@ int CGlobalTRex::reset_counters(){
CPhyEthIF * _if=&m_ports[i];
_if->stats_clear();
}
+
+ return (0);
}
+/**
+ * check for a single core
+ *
+ * @author imarom (19-Nov-15)
+ *
+ * @param thread_id
+ */
+void
+CGlobalTRex::check_for_dp_message_from_core(int thread_id) {
+
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(thread_id);
+
+ /* fast path check */
+ if ( likely ( ring->isEmpty() ) ) {
+ return;
+ }
+
+ while ( true ) {
+ CGenNode * node = NULL;
+ if (ring->Dequeue(node) != 0) {
+ break;
+ }
+ assert(node);
+
+ TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node;
+ msg->handle();
+ delete msg;
+ }
+
+}
+
+/**
+ * check for messages that arrived from DP to CP
+ *
+ */
+void
+CGlobalTRex::check_for_dp_messages() {
+ /* for all the cores - check for a new message */
+ for (int i = 0; i < get_cores_tx(); i++) {
+ check_for_dp_message_from_core(i);
+ }
+}
bool CGlobalTRex::is_all_links_are_up(bool dump){
bool all_link_are=true;
@@ -3208,6 +3234,40 @@ bool CGlobalTRex::is_all_links_are_up(bool dump){
}
+void CGlobalTRex::try_stop_all_dp(){
+
+ TrexStatelessDpQuit * msg= new TrexStatelessDpQuit();
+ send_message_all_dp(msg);
+ delete msg;
+ bool all_core_finished = false;
+ int i;
+ for (i=0; i<20; i++) {
+ if ( is_all_cores_finished() ){
+ all_core_finished =true;
+ break;
+ }
+ delay(100);
+ }
+ if ( all_core_finished ){
+ printf(" All cores stopped !! \n");
+ }else{
+ printf(" ERROR one of the DP core is stucked !\n");
+ }
+}
+
+
+int CGlobalTRex::send_message_all_dp(TrexStatelessCpToDpMsgBase *msg){
+
+ int max_threads=(int)CMsgIns::Ins()->getCpDp()->get_num_threads();
+ int i;
+
+ for (i=0; i<max_threads; i++) {
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp((uint8_t)i);
+ ring->Enqueue((CGenNode*)msg->clone());
+ }
+ return (0);
+}
+
int CGlobalTRex::ixgbe_rx_queue_flush(){
int i;
@@ -3259,6 +3319,8 @@ int CGlobalTRex::ixgbe_configure_mg(void){
m_mg.Create(&mg_cfg);
m_mg.set_mask(CGlobalInfo::m_options.m_latency_mask);
+
+ return (0);
}
@@ -3381,7 +3443,6 @@ int CGlobalTRex::ixgbe_start(void){
*/
int port_offset=0;
- int queue_offset=0;
for (i=0; i<get_cores_tx(); i++) {
int j=(i+1);
int queue_id=((j-1)/get_base_num_cores() ); /* for the first min core queue 0 , then queue 1 etc */
@@ -3408,27 +3469,15 @@ int CGlobalTRex::ixgbe_start(void){
m_cores_vif[i+1]->DumpIfCfg(stdout);
}
fprintf(stdout," -------------------------------\n");
+
+ return (0);
}
bool CGlobalTRex::Create(){
CFlowsYamlInfo pre_yaml_info;
- if (get_is_stateless()) {
-
- TrexStatelessCfg cfg;
-
- TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port);
-
- cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd;
- cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg;
- cfg.m_rpc_async_cfg = NULL;
- cfg.m_rpc_server_verbose = false;
- cfg.m_platform_api = new TrexDpdkPlatformApi();
-
- m_trex_stateless = new TrexStateless(cfg);
-
- } else {
+ if (!get_is_stateless()) {
pre_yaml_info.load_from_yaml_file(CGlobalInfo::m_options.cfg_file);
}
@@ -3450,12 +3499,12 @@ bool CGlobalTRex::Create(){
assert( CMsgIns::Ins()->Create(get_cores_tx()) );
if ( sizeof(CGenNodeNatInfo) != sizeof(CGenNode) ) {
- printf("ERROR sizeof(CGenNodeNatInfo) %d != sizeof(CGenNode) %d must be the same size \n",sizeof(CGenNodeNatInfo),sizeof(CGenNode));
+ printf("ERROR sizeof(CGenNodeNatInfo) %lu != sizeof(CGenNode) %lu must be the same size \n",sizeof(CGenNodeNatInfo),sizeof(CGenNode));
assert(0);
}
if ( sizeof(CGenNodeLatencyPktInfo) != sizeof(CGenNode) ) {
- printf("ERROR sizeof(CGenNodeLatencyPktInfo) %d != sizeof(CGenNode) %d must be the same size \n",sizeof(CGenNodeLatencyPktInfo),sizeof(CGenNode));
+ printf("ERROR sizeof(CGenNodeLatencyPktInfo) %lu != sizeof(CGenNode) %lu must be the same size \n",sizeof(CGenNodeLatencyPktInfo),sizeof(CGenNode));
assert(0);
}
@@ -3472,6 +3521,24 @@ bool CGlobalTRex::Create(){
CGlobalInfo::init_pools(rx_mbuf);
ixgbe_start();
dump_config(stdout);
+
+ /* start stateless */
+ if (get_is_stateless()) {
+
+ TrexStatelessCfg cfg;
+
+ TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port);
+
+ cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd;
+ cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg;
+ cfg.m_rpc_async_cfg = NULL;
+ cfg.m_rpc_server_verbose = false;
+ cfg.m_platform_api = new TrexDpdkPlatformApi();
+ cfg.m_publisher = &m_zmq_publisher;
+
+ m_trex_stateless = new TrexStateless(cfg);
+ }
+
return (true);
}
@@ -3483,9 +3550,6 @@ void CGlobalTRex::Delete(){
int CGlobalTRex::ixgbe_prob_init(void){
- uint8_t nb_ports;
-
-
m_max_ports = rte_eth_dev_count();
if (m_max_ports == 0)
rte_exit(EXIT_FAILURE, "No Ethernet ports - bye\n");
@@ -3664,17 +3728,17 @@ void CGlobalTRex::dump_post_test_stats(FILE *fd){
fprintf (fd," summary stats \n");
fprintf (fd," -------------- \n");
- fprintf (fd," Total-pkt-drop : %d pkts \n",(int64_t)(pkt_out-pkt_in));
- fprintf (fd," Total-tx-bytes : %llu bytes \n",pkt_out_bytes);
- fprintf (fd," Total-tx-sw-bytes : %llu bytes \n",sw_pkt_out_bytes);
- fprintf (fd," Total-rx-bytes : %llu byte \n",pkt_in_bytes);
+ fprintf (fd," Total-pkt-drop : %llu pkts \n",(unsigned long long)(pkt_out-pkt_in));
+ fprintf (fd," Total-tx-bytes : %llu bytes \n", (unsigned long long)pkt_out_bytes);
+ fprintf (fd," Total-tx-sw-bytes : %llu bytes \n", (unsigned long long)sw_pkt_out_bytes);
+ fprintf (fd," Total-rx-bytes : %llu byte \n", (unsigned long long)pkt_in_bytes);
fprintf (fd," \n");
- fprintf (fd," Total-tx-pkt : %llu pkts \n",pkt_out);
- fprintf (fd," Total-rx-pkt : %llu pkts \n",pkt_in);
- fprintf (fd," Total-sw-tx-pkt : %llu pkts \n",sw_pkt_out);
- fprintf (fd," Total-sw-err : %llu pkts \n",sw_pkt_out_err);
+ fprintf (fd," Total-tx-pkt : %llu pkts \n", (unsigned long long)pkt_out);
+ fprintf (fd," Total-rx-pkt : %llu pkts \n", (unsigned long long)pkt_in);
+ fprintf (fd," Total-sw-tx-pkt : %llu pkts \n", (unsigned long long)sw_pkt_out);
+ fprintf (fd," Total-sw-err : %llu pkts \n", (unsigned long long)sw_pkt_out_err);
if ( !CGlobalInfo::m_options.is_latency_disabled() ){
@@ -3713,7 +3777,8 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){
int i;
float total_tx=0.0;
float total_rx=0.0;
- float total_pps=0.0;
+ float total_tx_pps=0.0;
+ float total_rx_pps=0.0;
stats.m_total_tx_pkts = 0;
stats.m_total_rx_pkts = 0;
@@ -3741,6 +3806,9 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){
stp->ierrors = st.ierrors;
stp->oerrors = st.oerrors;
stp->m_total_tx_bps = _if->get_last_tx_rate()*_1Mb_DOUBLE;
+ stp->m_total_tx_pps = _if->get_last_tx_pps_rate();
+ stp->m_total_rx_bps = _if->get_last_rx_rate()*_1Mb_DOUBLE;
+ stp->m_total_rx_pps = _if->get_last_rx_pps_rate();
stats.m_total_tx_pkts += st.opackets;
stats.m_total_rx_pkts += st.ipackets;
@@ -3749,7 +3817,8 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){
total_tx +=_if->get_last_tx_rate();
total_rx +=_if->get_last_rx_rate();
- total_pps +=_if->get_last_pps_rate();
+ total_tx_pps +=_if->get_last_tx_pps_rate();
+ total_rx_pps +=_if->get_last_rx_pps_rate();
}
@@ -3832,7 +3901,8 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){
stats.m_tx_bps = total_tx*pf*_1Mb_DOUBLE;
stats.m_rx_bps = total_rx*pf*_1Mb_DOUBLE;
- stats.m_tx_pps = total_pps*pf;
+ stats.m_tx_pps = total_tx_pps*pf;
+ stats.m_rx_pps = total_rx_pps*pf;
stats.m_tx_cps = m_last_total_cps*pf;
stats.m_tx_expected_cps = m_expected_cps*pf;
@@ -3920,7 +3990,9 @@ int CGlobalTRex::run_in_master(){
std::string json;
bool was_stopped=false;
- m_trex_stateless->launch_control_plane();
+ if ( get_is_stateless() ) {
+ m_trex_stateless->launch_control_plane();
+ }
while ( true ) {
@@ -4033,6 +4105,9 @@ int CGlobalTRex::run_in_master(){
m_trex_stateless->generate_publish_snapshot(json);
m_zmq_publisher.publish_json(json);
+ /* check from messages from DP */
+ check_for_dp_messages();
+
delay(500);
if ( is_all_cores_finished() ) {
@@ -4040,6 +4115,11 @@ int CGlobalTRex::run_in_master(){
}
}
+ if (!is_all_cores_finished()) {
+ /* probably CLTR-C */
+ try_stop_all_dp();
+ }
+
m_mg.stop();
delay(1000);
if ( was_stopped ){
@@ -4081,7 +4161,7 @@ int CGlobalTRex::run_in_core(virtual_thread_id_t virt_core_id){
lpt = m_fl.m_threads_info[virt_core_id-1];
if (get_is_stateless()) {
- lpt->start_stateless_daemon();
+ lpt->start_stateless_daemon(*lp);
}else{
lpt->start_generate_stateful(CGlobalInfo::m_options.out_file,*lp);
}
@@ -4143,6 +4223,7 @@ int CGlobalTRex::stop_master(){
dump_post_test_stats(stdout);
m_fl.Delete();
+ return (0);
}
bool CGlobalTRex::is_all_cores_finished(){
@@ -4176,6 +4257,8 @@ int CGlobalTRex::start_master_stateless(){
lpt->m_node_gen.m_socket_id =m_cores_vif[i+1]->get_socket_id();
}
m_fl_was_init=true;
+
+ return (0);
}
@@ -4234,6 +4317,7 @@ int CGlobalTRex::start_send_master(){
}
m_fl_was_init=true;
+ return (0);
}
@@ -4374,7 +4458,6 @@ int update_global_info_from_platform_file(){
int update_dpdk_args(void){
- uint32_t cores_number;
CPlatformSocketInfo * lpsock=&CGlobalInfo::m_socket;
CParserOption * lpop= &CGlobalInfo::m_options;
@@ -4391,7 +4474,7 @@ int update_dpdk_args(void){
}
- sprintf(global_cores_str,"0x%x",lpsock->get_cores_mask());
+ sprintf(global_cores_str,"0x%llx",(unsigned long long)lpsock->get_cores_mask());
/* set the DPDK options */
global_dpdk_args_num =7;
@@ -4442,6 +4525,7 @@ int update_dpdk_args(void){
printf(" %s \n",global_dpdk_args[i]);
}
}
+ return (0);
}
@@ -4550,11 +4634,10 @@ int main_test(int argc , char * argv[]){
&& (CGlobalInfo::m_options.m_latency_prev>0) ){
uint32_t pkts = CGlobalInfo::m_options.m_latency_prev*
CGlobalInfo::m_options.m_latency_rate;
- printf("Start prev latency check - hack for Keren for %d sec \n",CGlobalInfo::m_options.m_latency_prev);
+ printf("Start prev latency check- for %d sec \n",CGlobalInfo::m_options.m_latency_prev);
g_trex.m_mg.start(pkts);
- printf("Delay now you can call command \n");
delay(CGlobalInfo::m_options.m_latency_prev* 1000);
- printf("Finish wating \n");
+ printf("Finished \n");
g_trex.m_mg.reset();
g_trex.reset_counters();
}
@@ -4725,14 +4808,13 @@ int CTRexExtendedDriverBase1G::configure_rx_filter_rules(CPhyEthIF * _if){
/* enable all rules */
_if->pci_reg_write(E1000_WUFC, (mask<<16) | (1<<14) );
+
+ return (0);
}
void CTRexExtendedDriverBase1G::get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats){
- int i;
- uint64_t t=0;
-
stats->ipackets += _if->pci_reg_read(E1000_GPRC) ;
stats->ibytes += (_if->pci_reg_read(E1000_GORCL) );
@@ -4862,6 +4944,7 @@ int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if){
rte_exit(EXIT_FAILURE, " ERROR rte_eth_dev_fdir_add_perfect_filter : %d\n",res);
}
}
+ return (0);
}
int CTRexExtendedDriverBase10G::configure_drop_queue(CPhyEthIF * _if){
@@ -4984,6 +5067,8 @@ int CTRexExtendedDriverBase40G::configure_rx_filter_rules(CPhyEthIF * _if){
add_rules(_if,RTE_ETH_FLOW_TYPE_UDPV6,ttl);
add_rules(_if,RTE_ETH_FLOW_TYPE_TCPV6,ttl);
}
+
+ return (0);
}
@@ -5172,3 +5257,21 @@ TrexDpdkPlatformApi::get_dp_core_count() const {
return CGlobalInfo::m_options.preview.getCores();
}
+
+void
+TrexDpdkPlatformApi::port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const {
+
+ cores_id_list.clear();
+
+ /* iterate over all DP cores */
+ for (uint8_t core_id = 0; core_id < g_trex.get_cores_tx(); core_id++) {
+
+ /* iterate over all the directions*/
+ for (uint8_t dir = 0 ; dir < CS_NUM; dir++) {
+ if (g_trex.m_cores_vif[core_id + 1]->get_ports()[dir].m_port->get_port_id() == port_id) {
+ cores_id_list.push_back(std::make_pair(core_id, dir));
+ }
+ }
+ }
+}
+
diff --git a/src/msg_manager.cpp b/src/msg_manager.cpp
index 9f41d08c..5fe44771 100755
--- a/src/msg_manager.cpp
+++ b/src/msg_manager.cpp
@@ -51,15 +51,20 @@ bool CMessagingManager::Create(uint8_t num_dp_threads,std::string a_name){
return (true);
}
void CMessagingManager::Delete(){
- if (m_dp_to_cp) {
- m_dp_to_cp->Delete();
- delete []m_dp_to_cp;
- }
- if (m_cp_to_dp) {
- m_cp_to_dp->Delete();
- delete []m_cp_to_dp;
+
+ assert(m_cp_to_dp);
+ assert(m_dp_to_cp);
+ int i;
+ for (i=0; i<m_num_dp_threads; i++) {
+ CNodeRing * lp;
+ lp=getRingCpToDp(i);
+ lp->Delete();
+ lp=getRingDpToCp(i);
+ lp->Delete();
}
+ delete []m_dp_to_cp;
+ delete []m_cp_to_dp;
}
CNodeRing * CMessagingManager::getRingCpToDp(uint8_t thread_id){
@@ -76,6 +81,7 @@ CNodeRing * CMessagingManager::getRingDpToCp(uint8_t thread_id){
void CMsgIns::Free(){
if (m_ins) {
+ m_ins->Delete();
delete m_ins;
}
}
@@ -98,6 +104,12 @@ bool CMsgIns::Create(uint8_t num_threads){
}
+void CMsgIns::Delete(){
+ m_cp_dp.Delete();
+ m_rx_dp.Delete();
+}
+
+
CMsgIns * CMsgIns::m_ins=0;
diff --git a/src/msg_manager.h b/src/msg_manager.h
index 8958f826..0390ce10 100755
--- a/src/msg_manager.h
+++ b/src/msg_manager.h
@@ -98,6 +98,7 @@ public:
static CMsgIns * Ins();
static void Free();
bool Create(uint8_t num_threads);
+ void Delete();
public:
CMessagingManager * getRxDp(){
return (&m_rx_dp);
diff --git a/src/nat_check.cpp b/src/nat_check.cpp
index 676c1292..170d2de6 100755
--- a/src/nat_check.cpp
+++ b/src/nat_check.cpp
@@ -171,8 +171,8 @@ void CNatRxManager::handle_packet_ipv4(CNatOption * option,
}
-#define MYDP(f) if (f) fprintf(fd," %-40s: %llu \n",#f,f)
-#define MYDP_A(f) fprintf(fd," %-40s: %llu \n",#f,f)
+#define MYDP(f) if (f) fprintf(fd," %-40s: %llu \n",#f,(unsigned long long)f)
+#define MYDP_A(f) fprintf(fd," %-40s: %llu \n",#f, (unsigned long long)f)
diff --git a/src/pal/linux/mbuf.cpp b/src/pal/linux/mbuf.cpp
index 7eca8fd5..26a54fe9 100755
--- a/src/pal/linux/mbuf.cpp
+++ b/src/pal/linux/mbuf.cpp
@@ -78,6 +78,13 @@ rte_mempool_t * utl_rte_mempool_create(const char *name,
return p;
}
+void utl_rte_mempool_delete(rte_mempool_t * & pool){
+ if (pool) {
+ delete pool;
+ pool=0;
+ }
+}
+
uint16_t rte_mbuf_refcnt_update(rte_mbuf_t *m, int16_t value)
{
diff --git a/src/pal/linux/mbuf.h b/src/pal/linux/mbuf.h
index 35a442bf..4132f842 100755
--- a/src/pal/linux/mbuf.h
+++ b/src/pal/linux/mbuf.h
@@ -65,6 +65,8 @@ typedef struct rte_mempool rte_mempool_t;
#define RTE_PKTMBUF_HEADROOM 0
+void utl_rte_mempool_delete(rte_mempool_t * &pool);
+
rte_mempool_t * utl_rte_mempool_create(const char *name,
unsigned n,
unsigned elt_size,
diff --git a/src/pal/linux_dpdk/mbuf.h b/src/pal/linux_dpdk/mbuf.h
index cde01077..339c0909 100755
--- a/src/pal/linux_dpdk/mbuf.h
+++ b/src/pal/linux_dpdk/mbuf.h
@@ -30,6 +30,10 @@ typedef struct rte_mbuf rte_mbuf_t;
typedef struct rte_mempool rte_mempool_t;
+inline void utl_rte_mempool_delete(rte_mempool_t * & pool){
+}
+
+
rte_mempool_t * utl_rte_mempool_create(const char *name,
unsigned n,
unsigned elt_size,
diff --git a/src/platform_cfg.cpp b/src/platform_cfg.cpp
index 92ffefbd..547cc3ad 100755
--- a/src/platform_cfg.cpp
+++ b/src/platform_cfg.cpp
@@ -127,7 +127,7 @@ void CPlatformMemoryYamlInfo::Dump(FILE *fd){
int i=0;
for (i=0; i<MBUF_SIZE; i++) {
- fprintf(fd," %-40s : %lu \n",names[i].c_str(),m_mbuf[i]);
+ fprintf(fd," %-40s : %lu \n",names[i].c_str(), (ulong)m_mbuf[i]);
}
}
@@ -379,7 +379,7 @@ void CPlatformYamlInfo::Dump(FILE *fd){
}else{
fprintf(fd," port limit : not configured \n");
}
- fprintf(fd," port_bandwidth_gb : %lu \n",m_port_bandwidth_gb);
+ fprintf(fd," port_bandwidth_gb : %lu \n", (ulong)m_port_bandwidth_gb);
if ( m_if_mask_exist && m_if_mask.size() ) {
fprintf(fd," if_mask : ");
@@ -387,7 +387,7 @@ void CPlatformYamlInfo::Dump(FILE *fd){
for (i=0; i<(int)m_if_mask.size(); i++) {
fprintf(fd," %s,",m_if_mask[i].c_str());
}
- fprintf(fd,"\n",m_if_mask[i].c_str());
+ fprintf(fd,"\n");
}else{
fprintf(fd," if_mask : None \n");
@@ -414,7 +414,9 @@ void CPlatformYamlInfo::Dump(FILE *fd){
}
if ( m_telnet_exist ){
fprintf(fd," telnet_port : %d \n",m_telnet_port);
+
}
+ fprintf(fd," m_zmq_rpc_port : %d \n",m_zmq_rpc_port);
if ( m_mac_info_exist ){
int i;
diff --git a/src/platform_cfg.h b/src/platform_cfg.h
index b4b03b10..4fc3c3dd 100755
--- a/src/platform_cfg.h
+++ b/src/platform_cfg.h
@@ -180,11 +180,11 @@ public:
m_enable_zmq_pub_exist=false;
m_enable_zmq_pub=true;
m_zmq_pub_port=4500;
+ m_zmq_rpc_port = 4501;
m_telnet_exist=false;
m_telnet_port=4502 ;
- m_zmq_rpc_port = 5050;
m_mac_info_exist=false;
m_port_bandwidth_gb = 10;
diff --git a/src/publisher/trex_publisher.cpp b/src/publisher/trex_publisher.cpp
new file mode 100644
index 00000000..35653069
--- /dev/null
+++ b/src/publisher/trex_publisher.cpp
@@ -0,0 +1,107 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include "trex_publisher.h"
+#include <zmq.h>
+#include <assert.h>
+#include <sstream>
+#include <iostream>
+
+/**
+ * create the publisher
+ *
+ */
+bool
+TrexPublisher::Create(uint16_t port, bool disable){
+
+ if (disable) {
+ return (true);
+ }
+
+ m_context = zmq_ctx_new();
+ if ( m_context == 0 ) {
+ show_zmq_last_error("can't connect to ZMQ library");
+ }
+
+ m_publisher = zmq_socket (m_context, ZMQ_PUB);
+ if ( m_context == 0 ) {
+ show_zmq_last_error("can't create ZMQ socket");
+ }
+
+ std::stringstream ss;
+ ss << "tcp://*:" << port;
+
+ int rc = zmq_bind (m_publisher, ss.str().c_str());
+ if (rc != 0 ) {
+ show_zmq_last_error("can't bind to ZMQ socket at " + ss.str());
+ }
+
+ std::cout << "zmq publisher at: " << ss.str() << "\n";
+ return (true);
+}
+
+
+void
+TrexPublisher::Delete(){
+ if (m_publisher) {
+ zmq_close (m_publisher);
+ m_publisher = NULL;
+ }
+ if (m_context) {
+ zmq_ctx_destroy (m_context);
+ m_context = NULL;
+ }
+}
+
+
+void
+TrexPublisher::publish_json(const std::string &s){
+ if (m_publisher) {
+ int size = zmq_send (m_publisher, s.c_str(), s.length(), 0);
+ assert(size == s.length());
+ }
+}
+
+void
+TrexPublisher::publish_event(event_type_e type, const Json::Value &data) {
+ Json::FastWriter writer;
+ Json::Value value;
+ std::string s;
+
+ value["name"] = "trex-event";
+ value["type"] = type;
+ value["data"] = data;
+
+ s = writer.write(value);
+ publish_json(s);
+}
+
+/**
+ * error handling
+ *
+ */
+void
+TrexPublisher::show_zmq_last_error(const std::string &err){
+ std::cout << " ERROR " << err << "\n";
+ std::cout << " ZMQ: " << zmq_strerror (zmq_errno ());
+ exit(-1);
+}
+
diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h
new file mode 100644
index 00000000..82603fda
--- /dev/null
+++ b/src/publisher/trex_publisher.h
@@ -0,0 +1,54 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_PUBLISHER_H__
+#define __TREX_PUBLISHER_H__
+
+#include <stdint.h>
+#include <string>
+#include <json/json.h>
+
+class TrexPublisher {
+
+public:
+
+ TrexPublisher() {
+ m_context = NULL;
+ m_publisher = NULL;
+ }
+
+ bool Create(uint16_t port, bool disable);
+ void Delete();
+ void publish_json(const std::string &s);
+
+ enum event_type_e {
+ EVENT_PORT_STOPPED = 0
+ };
+
+ void publish_event(event_type_e type, const Json::Value &data);
+
+private:
+ void show_zmq_last_error(const std::string &err);
+private:
+ void * m_context;
+ void * m_publisher;
+};
+
+#endif /* __TREX_PUBLISHER_H__ */
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index b40e996f..1a7132ff 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -29,7 +29,7 @@ limitations under the License.
#include <iostream>
#include <unistd.h>
-#ifndef TREX_RPC_MOCK_SERVER
+#ifdef RTE_DPDK
#include <../linux_dpdk/version.h>
#endif
@@ -73,7 +73,7 @@ TrexRpcCmdGetVersion::_run(const Json::Value &params, Json::Value &result) {
Json::Value &section = result["result"];
- #ifndef TREX_RPC_MOCK_SERVER
+ #ifdef RTE_DPDK
section["version"] = VERSION_BUILD_NUM;
section["build_date"] = get_build_date();
@@ -222,12 +222,12 @@ TrexRpcCmdAcquire::_run(const Json::Value &params, Json::Value &result) {
/* if not free and not you and not force - fail */
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- if ( (!port->is_free_to_aquire()) && (port->get_owner() != new_owner) && (!force)) {
- generate_execute_err(result, "port is already taken by '" + port->get_owner() + "'");
+ try {
+ port->acquire(new_owner, force);
+ } catch (const TrexRpcException &ex) {
+ generate_execute_err(result, ex.what());
}
- port->set_owner(new_owner);
-
result["result"] = port->get_owner_handler();
return (TREX_RPC_CMD_OK);
@@ -244,12 +244,12 @@ TrexRpcCmdRelease::_run(const Json::Value &params, Json::Value &result) {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- if (port->get_state() == TrexStatelessPort::PORT_STATE_TRANSMITTING) {
- generate_execute_err(result, "cannot release a port during transmission");
+ try {
+ port->release();
+ } catch (const TrexRpcException &ex) {
+ generate_execute_err(result, ex.what());
}
- port->clear_owner();
-
result["result"] = "ACK";
return (TREX_RPC_CMD_OK);
@@ -266,13 +266,13 @@ TrexRpcCmdGetPortStats::_run(const Json::Value &params, Json::Value &result) {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- if (port->get_state() == TrexStatelessPort::PORT_STATE_DOWN) {
- generate_execute_err(result, "cannot get stats - port is down");
- }
-
result["result"]["status"] = port->get_state_as_string();
- port->encode_stats(result["result"]);
+ try {
+ port->encode_stats(result["result"]);
+ } catch (const TrexRpcException &ex) {
+ generate_execute_err(result, ex.what());
+ }
return (TREX_RPC_CMD_OK);
}
@@ -303,7 +303,7 @@ TrexRpcCmdSyncUser::_run(const Json::Value &params, Json::Value &result) {
owned_port["streams"] = Json::arrayValue;
std::vector <TrexStream *> streams;
- port->get_stream_table()->get_object_list(streams);
+ port->get_object_list(streams);
for (auto stream : streams) {
owned_port["streams"].append(stream->get_stream_json());
diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
index 9854cad7..cdd13ed6 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
@@ -115,7 +115,12 @@ TrexRpcCmdAddStream::_run(const Json::Value &params, Json::Value &result) {
validate_stream(stream, result);
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(stream->m_port_id);
- port->get_stream_table()->add_stream(stream);
+
+ try {
+ port->add_stream(stream);
+ } catch (const TrexRpcException &ex) {
+ generate_execute_err(result, ex.what());
+ }
result["result"] = "ACK";
@@ -127,7 +132,7 @@ TrexRpcCmdAddStream::_run(const Json::Value &params, Json::Value &result) {
TrexStream *
TrexRpcCmdAddStream::allocate_new_stream(const Json::Value &section, uint8_t port_id, uint32_t stream_id, Json::Value &result) {
- TrexStream *stream;
+ TrexStream *stream = NULL;
const Json::Value &mode = parse_object(section, "mode", result);
std::string type = parse_string(mode, "type", result);
@@ -135,14 +140,22 @@ TrexRpcCmdAddStream::allocate_new_stream(const Json::Value &section, uint8_t por
if (type == "continuous") {
double pps = parse_double(mode, "pps", result);
- stream = new TrexStreamContinuous(port_id, stream_id, pps);
+ stream = new TrexStream( TrexStream::stCONTINUOUS, port_id, stream_id);
+ stream->set_pps(pps);
+
+ if (stream->m_next_stream_id != -1) {
+ generate_parse_err(result, "continious stream cannot provide next stream id - only -1 is valid");
+ }
} else if (type == "single_burst") {
uint32_t total_pkts = parse_int(mode, "total_pkts", result);
double pps = parse_double(mode, "pps", result);
- stream = new TrexStreamBurst(port_id, stream_id, total_pkts, pps);
+ stream = new TrexStream(TrexStream::stSINGLE_BURST,port_id, stream_id);
+ stream->set_pps(pps);
+ stream->set_single_burst(total_pkts);
+
} else if (type == "multi_burst") {
@@ -151,8 +164,10 @@ TrexRpcCmdAddStream::allocate_new_stream(const Json::Value &section, uint8_t por
uint32_t num_bursts = parse_int(mode, "number_of_bursts", result);
uint32_t pkts_per_burst = parse_int(mode, "pkts_per_burst", result);
- stream = new TrexStreamMultiBurst(port_id, stream_id, pkts_per_burst, pps, num_bursts, ibg_usec);
-
+ stream = new TrexStream(TrexStream::stMULTI_BURST,port_id, stream_id );
+ stream->set_pps(pps);
+ stream->set_multi_burst(pkts_per_burst,num_bursts,ibg_usec);
+
} else {
generate_parse_err(result, "bad stream type provided: '" + type + "'");
@@ -200,9 +215,9 @@ TrexRpcCmdAddStream::parse_vm_instr_flow_var(const Json::Value &inst, TrexStream
std::string min_value_str = parse_string(inst, "min_value", result);
std::string max_value_str = parse_string(inst, "max_value", result);
- uint64_t init_value;
- uint64_t min_value;
- uint64_t max_value;
+ uint64_t init_value = 0;
+ uint64_t min_value = 0;
+ uint64_t max_value = 0;
try {
init_value = str2num(init_value_str);
@@ -293,7 +308,7 @@ TrexRpcCmdAddStream::validate_stream(const TrexStream *stream, Json::Value &resu
TrexStatelessPort * port = get_stateless_obj()->get_port_by_id(stream->m_port_id);
/* does such a stream exists ? */
- if (port->get_stream_table()->get_stream_by_id(stream->m_stream_id)) {
+ if (port->get_stream_by_id(stream->m_stream_id)) {
std::stringstream ss;
ss << "stream " << stream->m_stream_id << " already exists";
delete stream;
@@ -319,7 +334,7 @@ TrexRpcCmdRemoveStream::_run(const Json::Value &params, Json::Value &result) {
}
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- TrexStream *stream = port->get_stream_table()->get_stream_by_id(stream_id);
+ TrexStream *stream = port->get_stream_by_id(stream_id);
if (!stream) {
std::stringstream ss;
@@ -327,7 +342,12 @@ TrexRpcCmdRemoveStream::_run(const Json::Value &params, Json::Value &result) {
generate_execute_err(result, ss.str());
}
- port->get_stream_table()->remove_stream(stream);
+ try {
+ port->remove_stream(stream);
+ } catch (const TrexRpcException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
delete stream;
result["result"] = "ACK";
@@ -350,12 +370,18 @@ TrexRpcCmdRemoveAllStreams::_run(const Json::Value &params, Json::Value &result)
generate_execute_err(result, ss.str());
}
- TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- port->get_stream_table()->remove_and_delete_all_streams();
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ try {
+ port->remove_and_delete_all_streams();
+ } catch (const TrexRpcException &ex) {
+ generate_execute_err(result, ex.what());
+ }
- result["result"] = "ACK";
- return (TREX_RPC_CMD_OK);
+ result["result"] = "ACK";
+
+ return (TREX_RPC_CMD_OK);
}
/***************************
@@ -377,7 +403,7 @@ TrexRpcCmdGetStreamList::_run(const Json::Value &params, Json::Value &result) {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- port->get_stream_table()->get_id_list(stream_list);
+ port->get_id_list(stream_list);
Json::Value json_list = Json::arrayValue;
@@ -397,8 +423,8 @@ TrexRpcCmdGetStreamList::_run(const Json::Value &params, Json::Value &result) {
**************************/
trex_rpc_cmd_rc_e
TrexRpcCmdGetStream::_run(const Json::Value &params, Json::Value &result) {
- uint8_t port_id = parse_byte(params, "port_id", result);
-
+ uint8_t port_id = parse_byte(params, "port_id", result);
+ bool get_pkt = parse_bool(params, "get_pkt", result);
uint32_t stream_id = parse_int(params, "stream_id", result);
if (port_id >= get_stateless_obj()->get_port_count()) {
@@ -409,7 +435,7 @@ TrexRpcCmdGetStream::_run(const Json::Value &params, Json::Value &result) {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- TrexStream *stream = port->get_stream_table()->get_stream_by_id(stream_id);
+ TrexStream *stream = port->get_stream_by_id(stream_id);
if (!stream) {
std::stringstream ss;
@@ -418,7 +444,12 @@ TrexRpcCmdGetStream::_run(const Json::Value &params, Json::Value &result) {
}
/* return the stored stream json (instead of decoding it all over again) */
- result["result"]["stream"] = stream->get_stream_json();
+ Json::Value j = stream->get_stream_json();
+ if (!get_pkt) {
+ j.removeMember("packet");
+ }
+
+ result["result"]["stream"] = j;
return (TREX_RPC_CMD_OK);
@@ -431,8 +462,9 @@ TrexRpcCmdGetStream::_run(const Json::Value &params, Json::Value &result) {
trex_rpc_cmd_rc_e
TrexRpcCmdStartTraffic::_run(const Json::Value &params, Json::Value &result) {
- uint8_t port_id = parse_byte(params, "port_id", result);
- double mul = parse_double(params, "mul", result);
+ uint8_t port_id = parse_byte(params, "port_id", result);
+ double mul = parse_double(params, "mul", result);
+ double duration = parse_double(params, "duration", result);
if (port_id >= get_stateless_obj()->get_port_count()) {
std::stringstream ss;
@@ -442,37 +474,121 @@ TrexRpcCmdStartTraffic::_run(const Json::Value &params, Json::Value &result) {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- TrexStatelessPort::rc_e rc = port->start_traffic(mul);
+ try {
+ port->start_traffic(mul, duration);
+ } catch (const TrexRpcException &ex) {
+ generate_execute_err(result, ex.what());
+ }
- if (rc == TrexStatelessPort::RC_OK) {
- result["result"] = "ACK";
- } else {
+ result["result"] = "ACK";
+
+ return (TREX_RPC_CMD_OK);
+}
+
+/***************************
+ * stop traffic on port
+ *
+ **************************/
+trex_rpc_cmd_rc_e
+TrexRpcCmdStopTraffic::_run(const Json::Value &params, Json::Value &result) {
+ uint8_t port_id = parse_byte(params, "port_id", result);
+
+ if (port_id >= get_stateless_obj()->get_port_count()) {
std::stringstream ss;
- switch (rc) {
- case TrexStatelessPort::RC_ERR_BAD_STATE_FOR_OP:
- ss << "bad state for operations: port is either transmitting traffic or down";
- break;
- case TrexStatelessPort::RC_ERR_NO_STREAMS:
- ss << "no active streams on that port";
- break;
- default:
- ss << "failed to start traffic";
- break;
+ ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
+ generate_execute_err(result, ss.str());
+ }
+
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ try {
+ port->stop_traffic();
+ } catch (const TrexRpcException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ result["result"] = "ACK";
+
+ return (TREX_RPC_CMD_OK);
+}
+
+/***************************
+ * get all streams
+ *
+ **************************/
+trex_rpc_cmd_rc_e
+TrexRpcCmdGetAllStreams::_run(const Json::Value &params, Json::Value &result) {
+ uint8_t port_id = parse_byte(params, "port_id", result);
+ bool get_pkt = parse_bool(params, "get_pkt", result);
+
+ if (port_id >= get_stateless_obj()->get_port_count()) {
+ std::stringstream ss;
+ ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
+ generate_execute_err(result, ss.str());
+ }
+
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ std::vector <TrexStream *> streams;
+ port->get_object_list(streams);
+
+ Json::Value streams_json = Json::objectValue;
+ for (auto stream : streams) {
+
+ Json::Value j = stream->get_stream_json();
+
+ /* should we include the packet as well ? */
+ if (!get_pkt) {
+ j.removeMember("packet");
}
+ std::stringstream ss;
+ ss << stream->m_stream_id;
+
+ streams_json[ss.str()] = j;
+ }
+
+ result["result"]["streams"] = streams_json;
+
+ return (TREX_RPC_CMD_OK);
+}
+
+/***************************
+ * pause traffic
+ *
+ **************************/
+trex_rpc_cmd_rc_e
+TrexRpcCmdPauseTraffic::_run(const Json::Value &params, Json::Value &result) {
+
+ uint8_t port_id = parse_byte(params, "port_id", result);
+
+ if (port_id >= get_stateless_obj()->get_port_count()) {
+ std::stringstream ss;
+ ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
generate_execute_err(result, ss.str());
}
- return (TREX_RPC_CMD_OK);
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ try {
+ port->pause_traffic();
+ } catch (const TrexRpcException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ result["result"] = "ACK";
+
+ return (TREX_RPC_CMD_OK);
}
/***************************
- * start traffic on port
+ * resume traffic
*
**************************/
trex_rpc_cmd_rc_e
-TrexRpcCmdStopTraffic::_run(const Json::Value &params, Json::Value &result) {
- uint8_t port_id = parse_byte(params, "port_id", result);
+TrexRpcCmdResumeTraffic::_run(const Json::Value &params, Json::Value &result) {
+
+ uint8_t port_id = parse_byte(params, "port_id", result);
if (port_id >= get_stateless_obj()->get_port_count()) {
std::stringstream ss;
@@ -482,7 +598,41 @@ TrexRpcCmdStopTraffic::_run(const Json::Value &params, Json::Value &result) {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
- port->stop_traffic();
+ try {
+ port->resume_traffic();
+ } catch (const TrexRpcException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ result["result"] = "ACK";
+
+ return (TREX_RPC_CMD_OK);
+}
+
+/***************************
+ * update traffic
+ *
+ **************************/
+trex_rpc_cmd_rc_e
+TrexRpcCmdUpdateTraffic::_run(const Json::Value &params, Json::Value &result) {
+
+ uint8_t port_id = parse_byte(params, "port_id", result);
+ double mul = parse_double(params, "mul", result);
+
+ if (port_id >= get_stateless_obj()->get_port_count()) {
+ std::stringstream ss;
+ ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1;
+ generate_execute_err(result, ss.str());
+ }
+
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ try {
+ port->update_traffic(mul);
+ } catch (const TrexRpcException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
result["result"] = "ACK";
return (TREX_RPC_CMD_OK);
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index 91c29548..b4f37e3b 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -99,11 +99,18 @@ void parse_vm_instr_write_flow_var(const Json::Value &inst, TrexStream *stream,
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStreamList, "get_stream_list", 1, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetAllStreams, "get_all_streams", 2, true);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 2, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 3, true);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 2, true);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic, "stop_traffic", 1, true);
+
+
+TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 3, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic, "stop_traffic", 1, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdPauseTraffic, "pause_traffic", 1, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdResumeTraffic, "resume_traffic", 1, true);
+
+TREX_RPC_CMD_DEFINE(TrexRpcCmdUpdateTraffic, "update_traffic", 2, true);
TREX_RPC_CMD_DEFINE(TrexRpcCmdSyncUser, "sync_user", 2, false);
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index 46281aff..a65bbccf 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -50,8 +50,13 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
register_command(new TrexRpcCmdRemoveAllStreams());
register_command(new TrexRpcCmdGetStreamList());
register_command(new TrexRpcCmdGetStream());
+ register_command(new TrexRpcCmdGetAllStreams());
+
register_command(new TrexRpcCmdStartTraffic());
register_command(new TrexRpcCmdStopTraffic());
+ register_command(new TrexRpcCmdPauseTraffic());
+ register_command(new TrexRpcCmdResumeTraffic());
+ register_command(new TrexRpcCmdUpdateTraffic());
}
TrexRpcCommandsTable::~TrexRpcCommandsTable() {
diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h
index 1f638adf..bc38c0ef 100644
--- a/src/rpc-server/trex_rpc_req_resp_server.h
+++ b/src/rpc-server/trex_rpc_req_resp_server.h
@@ -43,7 +43,7 @@ private:
void handle_request(const std::string &request);
void handle_server_error(const std::string &specific_err);
- static const int RPC_MAX_MSG_SIZE = (20 * 1024);
+ static const int RPC_MAX_MSG_SIZE = (200 * 1024);
void *m_context;
void *m_socket;
uint8_t m_msg_buffer[RPC_MAX_MSG_SIZE];
diff --git a/src/rx_check.cpp b/src/rx_check.cpp
index 3a67ca23..59b42e1a 100755
--- a/src/rx_check.cpp
+++ b/src/rx_check.cpp
@@ -45,8 +45,8 @@ void CRxCheckFlowTableStats::Clear(){
}
-#define MYDP(f) if (f) fprintf(fd," %-40s: %llu \n",#f,f)
-#define MYDP_A(f) fprintf(fd," %-40s: %llu \n",#f,f)
+#define MYDP(f) if (f) fprintf(fd," %-40s: %llu \n",#f,(unsigned long long)f)
+#define MYDP_A(f) fprintf(fd," %-40s: %llu \n",#f,(unsigned long long)f)
#define MYDP_J(f) json+=add_json(#f,f);
#define MYDP_J_LAST(f) json+=add_json(#f,f,true);
@@ -146,7 +146,7 @@ void CRxCheckFlowTableMap::dump_all(FILE *fd){
rx_check_flow_map_iter_t it;
for (it= m_map.begin(); it != m_map.end(); ++it) {
CRxCheckFlow *lp = it->second;
- printf ("flow_id: %d \n",lp->m_flow_id);
+ printf ("flow_id: %llu \n",(unsigned long long)lp->m_flow_id);
}
}
@@ -208,7 +208,7 @@ std::string CPerTxthreadTemplateInfo::dump_as_json(std::string name){
int i;
for (i=0;i<MAX_TEMPLATES_STATS;i++){
char buff[200];
- sprintf(buff,"%llu",m_template_info[i]);
+ sprintf(buff,"%llu", (unsigned long long)m_template_info[i]);
json+=std::string(buff);
if ( i < MAX_TEMPLATES_STATS-1) {
json+=std::string(",");
@@ -231,7 +231,7 @@ void CPerTxthreadTemplateInfo::Dump(FILE *fd){
int i;
for (i=0; i<MAX_TEMPLATES_STATS; i++) {
if (m_template_info[i]) {
- fprintf (fd," template id: %llu %llu \n",i,m_template_info[i]);
+ fprintf (fd," template id: %d %llu \n",i, (unsigned long long)m_template_info[i]);
}
}
}
@@ -484,7 +484,7 @@ void RxCheckManager::DumpTemplate(FILE *fd,bool verbose){
if (cnt==0){
fprintf(fd,"\n");
}
- fprintf(fd,"[id:%2d val:%8d,rx:%8d], ",i,lp->get_error_counter(),lp->get_rx_counter());
+ fprintf(fd,"[id:%2d val:%8llu,rx:%8llu], ",i, (unsigned long long)lp->get_error_counter(), (unsigned long long)lp->get_rx_counter());
cnt++;
if (cnt>5) {
cnt=0;
@@ -500,7 +500,11 @@ void RxCheckManager::DumpTemplateFull(FILE *fd){
int i;
for (i=0; i<MAX_TEMPLATES_STATS;i++ ) {
CPerTemplateInfo * lp=get_template(i);
- fprintf(fd," template_id_%2d , errors:%8d, jitter: %lu rx : %lu \n",i,lp->get_error_counter(),lp->get_jitter_usec(),lp->get_rx_counter() );
+ fprintf(fd," template_id_%2d , errors:%8llu, jitter: %llu rx : %llu \n",
+ i,
+ (unsigned long long)lp->get_error_counter(),
+ (unsigned long long)lp->get_jitter_usec(),
+ (unsigned long long)lp->get_rx_counter() );
}
}
@@ -514,7 +518,11 @@ void RxCheckManager::DumpShort(FILE *fd){
DumpTemplate(fd,false);
fprintf(fd,"\n");
fprintf(fd,"---\n");
- fprintf(fd," active flows: %8d, fif: %8d, drop: %8d, errors: %8d \n",m_stats.m_active,m_stats.m_fif,m_stats.m_err_drop,m_stats.get_total_err());
+ fprintf(fd," active flows: %8llu, fif: %8llu, drop: %8llu, errors: %8llu \n",
+ (unsigned long long)m_stats.m_active,
+ (unsigned long long)m_stats.m_fif,
+ (unsigned long long)m_stats.m_err_drop,
+ (unsigned long long)m_stats.get_total_err());
fprintf(fd,"------------------------------------------------------------------------------------------------------------\n");
}
diff --git a/src/rx_check.h b/src/rx_check.h
index 6f9763a2..07f5684c 100755
--- a/src/rx_check.h
+++ b/src/rx_check.h
@@ -30,9 +30,10 @@ limitations under the License.
typedef enum {
- CLIENT_SIDE=0,
- SERVER_SIDE=1,
- CS_NUM=2
+ CLIENT_SIDE = 0,
+ SERVER_SIDE = 1,
+ CS_NUM = 2,
+ CS_INVALID = 255
} pkt_dir_enum_t;
typedef uint8_t pkt_dir_t ;
diff --git a/src/rx_check_header.cpp b/src/rx_check_header.cpp
index 8ee580db..5934ee15 100755
--- a/src/rx_check_header.cpp
+++ b/src/rx_check_header.cpp
@@ -42,11 +42,11 @@ void CRx_check_header::dump(FILE *fd){
void CNatOption::dump(FILE *fd){
- fprintf(fd," op : %lx \n",get_option_type());
- fprintf(fd," ol : %lx \n",get_option_len());
- fprintf(fd," thread_id : %lx \n",get_thread_id());
- fprintf(fd," magic : %lx \n",get_magic());
- fprintf(fd," fid : %lx \n",get_fid());
+ fprintf(fd," op : %x \n",get_option_type());
+ fprintf(fd," ol : %x \n",get_option_len());
+ fprintf(fd," thread_id : %x \n",get_thread_id());
+ fprintf(fd," magic : %x \n",get_magic());
+ fprintf(fd," fid : %x \n",get_fid());
utl_DumpBuffer(stdout,(void *)&u.m_data[0],8,0);
}
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp
new file mode 100644
index 00000000..ba327e59
--- /dev/null
+++ b/src/stateless/cp/trex_dp_port_events.cpp
@@ -0,0 +1,220 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include <trex_dp_port_events.h>
+#include <sstream>
+#include <os_time.h>
+#include <trex_stateless.h>
+
+/**
+ * port events
+ */
+void
+TrexDpPortEvents::create(TrexStatelessPort *port) {
+ m_port = port;
+
+ for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) {
+ m_events[i].create((TrexDpPortEvent::event_e) i, port);
+ }
+
+ m_event_id_counter = EVENT_ID_INVALID;
+}
+
+/**
+ * generate a new event ID
+ *
+ */
+int
+TrexDpPortEvents::generate_event_id() {
+ return (++m_event_id_counter);
+}
+
+/**
+ * mark the next allowed event
+ * all other events will be disabled
+ *
+ */
+void
+TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) {
+
+ /* first disable all events */
+ for (TrexDpPortEvent & e : m_events) {
+ e.disable();
+ }
+
+ /* mark this event as allowed */
+ m_events[ev].wait_for_event(event_id, timeout_ms);
+}
+
+void
+TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) {
+ m_events[ev].disable();
+}
+
+/**
+ * handle an event
+ *
+ */
+void
+TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) {
+ m_events[ev].handle_event(thread_id, event_id);
+}
+
+/***********
+ * single event object
+ *
+ */
+
+void
+TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) {
+ m_event_type = type;
+ m_port = port;
+
+ /* add the core ids to the hash */
+ m_signal.clear();
+ for (int core_id : m_port->get_core_id_list()) {
+ m_signal[core_id] = false;
+ }
+
+ /* event is disabled */
+ disable();
+}
+
+
+/**
+ * wait the event using event id and timeout
+ *
+ */
+void
+TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) {
+
+ /* set a new event id */
+ m_event_id = event_id;
+
+ /* do we have a timeout ? */
+ if (timeout_ms > 0) {
+ m_expire_limit_ms = os_get_time_msec() + timeout_ms;
+ } else {
+ m_expire_limit_ms = -1;
+ }
+
+ /* prepare the signal array */
+ m_pending_cnt = 0;
+ for (auto & core_pair : m_signal) {
+ core_pair.second = false;
+ m_pending_cnt++;
+ }
+}
+
+void
+TrexDpPortEvent::disable() {
+ m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
+}
+
+/**
+ * get the event status
+ *
+ */
+
+TrexDpPortEvent::event_status_e
+TrexDpPortEvent::status() {
+
+ /* is it even active ? */
+ if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) {
+ return (EVENT_DISABLE);
+ }
+
+ /* did it occured ? */
+ if (m_pending_cnt == 0) {
+ return (EVENT_OCCURED);
+ }
+
+ /* so we are enabled and the event did not occur - maybe we timed out ? */
+ if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) {
+ return (EVENT_TIMED_OUT);
+ }
+
+ /* so we are still waiting... */
+ return (EVENT_PENDING);
+
+}
+
+void
+TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg) {
+ std::stringstream err;
+ err << "DP event '" << event_name(m_event_type) << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - ";
+}
+
+/**
+ * event occured
+ *
+ */
+void
+TrexDpPortEvent::handle_event(int thread_id, int event_id) {
+
+ /* if the event is disabled - we don't care */
+ if (!is_active()) {
+ return;
+ }
+
+ /* check the event id is matching the required event - if not maybe its an old signal */
+ if (event_id != m_event_id) {
+ return;
+ }
+
+ /* mark sure no double signal */
+ if (m_signal.at(thread_id)) {
+ err(thread_id, event_id, "double signal");
+
+ } else {
+ /* mark */
+ m_signal.at(thread_id) = true;
+ m_pending_cnt--;
+ }
+
+ /* event occured */
+ if (m_pending_cnt == 0) {
+ m_port->on_dp_event_occured(m_event_type);
+ m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
+ }
+}
+
+bool
+TrexDpPortEvent::is_active() {
+ return (status() != EVENT_DISABLE);
+}
+
+bool
+TrexDpPortEvent::has_timeout_expired() {
+ return (status() == EVENT_TIMED_OUT);
+}
+
+const char *
+TrexDpPortEvent::event_name(event_e type) {
+ switch (type) {
+ case EVENT_STOP:
+ return "DP STOP";
+
+ default:
+ throw TrexException("unknown event type");
+ }
+
+}
diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h
new file mode 100644
index 00000000..557e590b
--- /dev/null
+++ b/src/stateless/cp/trex_dp_port_events.h
@@ -0,0 +1,171 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_DP_PORT_EVENTS_H__
+#define __TREX_DP_PORT_EVENTS_H__
+
+#include <unordered_map>
+#include <string>
+
+class TrexStatelessPort;
+
+/**
+ * describes a single DP event related to port
+ *
+ * @author imarom (18-Nov-15)
+ */
+class TrexDpPortEvent {
+public:
+
+ enum event_e {
+ EVENT_STOP = 1,
+ EVENT_MAX
+ };
+
+ /**
+ * status of the event for the port
+ */
+ enum event_status_e {
+ EVENT_DISABLE,
+ EVENT_PENDING,
+ EVENT_TIMED_OUT,
+ EVENT_OCCURED
+ };
+
+ /**
+ * init for the event
+ *
+ */
+ void create(event_e type, TrexStatelessPort *port);
+
+ /**
+ * create a new pending event
+ *
+ */
+ void wait_for_event(int event_id, int timeout_ms = -1);
+
+ /**
+ * mark event as not allowed to happen
+ *
+ */
+ void disable();
+
+ /**
+ * get the event status
+ *
+ */
+ event_status_e status();
+
+ /**
+ * event occured
+ *
+ */
+ void handle_event(int thread_id, int event_id);
+
+ /**
+ * returns true if event is active
+ *
+ */
+ bool is_active();
+
+ /**
+ * has timeout already expired ?
+ *
+ */
+ bool has_timeout_expired();
+
+ /**
+ * generate error
+ *
+ */
+ void err(int thread_id, int event_id, const std::string &err_msg);
+
+ /**
+ * event to name
+ *
+ */
+ static const char * event_name(event_e type);
+
+
+private:
+
+ event_e m_event_type;
+ std::unordered_map<int, bool> m_signal;
+ int m_pending_cnt;
+
+ TrexStatelessPort *m_port;
+ int m_event_id;
+ int m_expire_limit_ms;
+
+};
+
+/**
+ * all the events related to a port
+ *
+ */
+class TrexDpPortEvents {
+public:
+ friend class TrexDpPortEvent;
+
+ void create(TrexStatelessPort *port);
+
+ /**
+ * generate a new event ID to be used with wait_for_event
+ *
+ */
+ int generate_event_id();
+
+ /**
+ * wait a new DP event on the port
+ * returns a key which will be used to identify
+ * the event happened
+ *
+ * @author imarom (18-Nov-15)
+ *
+ * @param ev - type of event
+ * @param event_id - a unique identifier for the event
+ * @param timeout_ms - does it has a timeout ?
+ *
+ */
+ void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1);
+
+ /**
+ * disable an event (don't care)
+ *
+ */
+ void disable(TrexDpPortEvent::event_e ev);
+
+ /**
+ * event has occured
+ *
+ */
+ void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id);
+
+private:
+ static const int EVENT_ID_INVALID = -1;
+
+ TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX];
+ int m_event_id_counter;
+
+ TrexStatelessPort *m_port;
+
+};
+
+#endif /* __TREX_DP_PORT_EVENTS_H__ */
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index e0e95450..a4522837 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -47,10 +47,12 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) {
m_port_count = cfg.m_port_count;
for (int i = 0; i < m_port_count; i++) {
- m_ports.push_back(new TrexStatelessPort(i));
+ m_ports.push_back(new TrexStatelessPort(i, cfg.m_platform_api));
}
m_platform_api = cfg.m_platform_api;
+ m_publisher = cfg.m_publisher;
+
}
/**
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index 57c6ef1d..5c11be1e 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -30,6 +30,7 @@ limitations under the License.
#include <trex_stream.h>
#include <trex_stateless_port.h>
#include <trex_rpc_server_api.h>
+#include <publisher/trex_publisher.h>
#include <internal_api/trex_platform_api.h>
@@ -93,6 +94,7 @@ public:
m_rpc_async_cfg = NULL;
m_rpc_server_verbose = false;
m_platform_api = NULL;
+ m_publisher = NULL;
}
const TrexRpcServerConfig *m_rpc_req_resp_cfg;
@@ -100,6 +102,7 @@ public:
const TrexPlatformApi *m_platform_api;
bool m_rpc_server_verbose;
uint8_t m_port_count;
+ TrexPublisher *m_publisher;
};
/**
@@ -150,6 +153,10 @@ public:
return (m_platform_api);
}
+ TrexPublisher * get_publisher() {
+ return m_publisher;
+ }
+
const std::vector <TrexStatelessPort *> get_port_list() {
return m_ports;
}
@@ -170,6 +177,8 @@ protected:
/* platform API */
const TrexPlatformApi *m_platform_api;
+ TrexPublisher *m_publisher;
+
std::mutex m_global_cp_lock;
};
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index a0b57b63..40392e68 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -27,45 +27,89 @@ limitations under the License.
#include <string>
#ifndef TREX_RPC_MOCK_SERVER
+
// DPDK c++ issue
-#define UINT8_MAX 255
-#define UINT16_MAX 0xFFFF
+#ifndef UINT8_MAX
+ #define UINT8_MAX 255
+#endif
+
+#ifndef UINT16_MAX
+ #define UINT16_MAX 0xFFFF
+#endif
+
// DPDK c++ issue
#endif
#include <rte_ethdev.h>
#include <os_time.h>
+void
+port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list);
+
using namespace std;
/***************************
* trex stateless port
*
**************************/
-TrexStatelessPort::TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) {
- m_port_state = PORT_STATE_UP_IDLE;
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) {
+ std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
+
+ m_port_id = port_id;
+
+ m_port_state = PORT_STATE_IDLE;
clear_owner();
+
+ /* get the DP cores belonging to this port */
+ api->port_id_to_cores(m_port_id, core_pair_list);
+
+ for (auto core_pair : core_pair_list) {
+
+ /* send the core id */
+ m_cores_id_list.push_back(core_pair.first);
+ }
+
+ /* init the events DP DB */
+ m_dp_events.create(this);
}
/**
- * starts the traffic on the port
+ * acquire the port
+ *
+ * @author imarom (09-Nov-15)
*
+ * @param user
+ * @param force
*/
-TrexStatelessPort::rc_e
-TrexStatelessPort::start_traffic(double mul) {
-
- if (m_port_state != PORT_STATE_UP_IDLE) {
- return (RC_ERR_BAD_STATE_FOR_OP);
+void
+TrexStatelessPort::acquire(const std::string &user, bool force) {
+ if ( (!is_free_to_aquire()) && (get_owner() != user) && (!force)) {
+ throw TrexRpcException("port is already taken by '" + get_owner() + "'");
}
- if (get_stream_table()->size() == 0) {
- return (RC_ERR_NO_STREAMS);
- }
+ set_owner(user);
+}
+
+void
+TrexStatelessPort::release(void) {
+ verify_state( ~(PORT_STATE_TX | PORT_STATE_PAUSE) );
+ clear_owner();
+}
+
+/**
+ * starts the traffic on the port
+ *
+ */
+void
+TrexStatelessPort::start_traffic(double mul, double duration) {
+
+ /* command allowed only on state stream */
+ verify_state(PORT_STATE_STREAMS);
/* fetch all the streams from the table */
vector<TrexStream *> streams;
- get_stream_table()->get_object_list(streams);
+ get_object_list(streams);
/* compiler it */
TrexStreamsCompiler compiler;
@@ -73,68 +117,121 @@ TrexStatelessPort::start_traffic(double mul) {
bool rc = compiler.compile(streams, *compiled_obj);
if (!rc) {
- return (RC_ERR_FAILED_TO_COMPILE_STREAMS);
+ throw TrexRpcException("Failed to compile streams");
}
/* generate a message to all the relevant DP cores to start transmitting */
- TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(compiled_obj);
+ int event_id = m_dp_events.generate_event_id();
+ /* mark that DP event of stoppped is possible */
+ m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id);
- // FIXME (add the right core list)
- CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(0);
+ TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_obj, duration);
- ring->Enqueue((CGenNode *)start_msg);
+ m_last_all_streams_continues = compiled_obj->get_all_streams_continues();
+ m_last_duration =duration;
- /* move the state to transmiting */
- m_port_state = PORT_STATE_TRANSMITTING;
+ change_state(PORT_STATE_TX);
- return (RC_OK);
+ send_message_to_dp(start_msg);
+
}
-TrexStatelessPort::rc_e
+/**
+ * stop traffic on port
+ *
+ * @author imarom (09-Nov-15)
+ *
+ * @return TrexStatelessPort::rc_e
+ */
+void
TrexStatelessPort::stop_traffic(void) {
- /* real code goes here */
- if (m_port_state != PORT_STATE_TRANSMITTING) {
- return (RC_ERR_BAD_STATE_FOR_OP);
+ if (!( (m_port_state == PORT_STATE_TX)
+ || (m_port_state ==PORT_STATE_PAUSE) )) {
+ return;
}
+ /* mask out the DP stop event */
+ m_dp_events.disable(TrexDpPortEvent::EVENT_STOP);
+
/* generate a message to all the relevant DP cores to start transmitting */
TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
- // FIXME (add the right core list)
- CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(0);
+ send_message_to_dp(stop_msg);
- ring->Enqueue((CGenNode *)stop_msg);
+ change_state(PORT_STATE_STREAMS);
+
+}
- m_port_state = PORT_STATE_UP_IDLE;
+void
+TrexStatelessPort::pause_traffic(void) {
+
+ verify_state(PORT_STATE_TX);
- return (RC_OK);
+ if (m_last_all_streams_continues == false) {
+ throw TrexRpcException(" pause is supported when all streams are in continues mode ");
+ }
+
+ if ( m_last_duration>0.0 ) {
+ throw TrexRpcException(" pause is supported when duration is not enable is start command ");
+ }
+
+ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpPause(m_port_id);
+
+ send_message_to_dp(stop_msg);
+
+ change_state(PORT_STATE_PAUSE);
}
-/**
-* access the stream table
-*
-*/
-TrexStreamTable * TrexStatelessPort::get_stream_table() {
- return &m_stream_table;
+void
+TrexStatelessPort::resume_traffic(void) {
+
+ verify_state(PORT_STATE_PAUSE);
+
+ /* generate a message to all the relevant DP cores to start transmitting */
+ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpResume(m_port_id);
+
+ send_message_to_dp(stop_msg);
+
+ change_state(PORT_STATE_TX);
}
+void
+TrexStatelessPort::update_traffic(double mul) {
+
+ verify_state(PORT_STATE_STREAMS | PORT_STATE_TX | PORT_STATE_PAUSE);
+
+ #if 0
+ /* generate a message to all the relevant DP cores to start transmitting */
+ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
+
+ send_message_to_dp(stop_msg);
+
+ m_port_state = PORT_STATE_UP_IDLE;
+ #endif
+}
std::string
-TrexStatelessPort::get_state_as_string() {
+TrexStatelessPort::get_state_as_string() const {
switch (get_state()) {
case PORT_STATE_DOWN:
- return "down";
+ return "DOWN";
+
+ case PORT_STATE_IDLE:
+ return "IDLE";
- case PORT_STATE_UP_IDLE:
- return "idle";
+ case PORT_STATE_STREAMS:
+ return "STREAMS";
- case PORT_STATE_TRANSMITTING:
- return "transmitting";
+ case PORT_STATE_TX:
+ return "TX";
+
+ case PORT_STATE_PAUSE:
+ return "PAUSE";
}
- return "unknown";
+ return "UNKNOWN";
}
void
@@ -145,6 +242,24 @@ TrexStatelessPort::get_properties(string &driver, string &speed) {
speed = "1 Gbps";
}
+bool
+TrexStatelessPort::verify_state(int state, bool should_throw) const {
+ if ( (state & m_port_state) == 0 ) {
+ if (should_throw) {
+ throw TrexRpcException("command cannot be executed on current state: '" + get_state_as_string() + "'");
+ } else {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void
+TrexStatelessPort::change_state(port_state_e new_state) {
+
+ m_port_state = new_state;
+}
/**
* generate a random connection handler
@@ -191,3 +306,39 @@ TrexStatelessPort::encode_stats(Json::Value &port) {
port["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors);
}
+void
+TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) {
+
+ for (auto core_id : m_cores_id_list) {
+
+ /* send the message to the core */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
+ ring->Enqueue((CGenNode *)msg->clone());
+ }
+
+}
+
+/**
+ * when a DP (async) event occurs - handle it
+ *
+ */
+void
+TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
+ Json::Value data;
+
+ switch (event_type) {
+
+ case TrexDpPortEvent::EVENT_STOP:
+ /* set a stop event */
+ change_state(PORT_STATE_STREAMS);
+ /* send a ZMQ event */
+
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
+ break;
+
+ default:
+ assert(0);
+
+ }
+}
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 3e071954..006ec97c 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -22,6 +22,10 @@ limitations under the License.
#define __TREX_STATELESS_PORT_H__
#include <trex_stream.h>
+#include <trex_dp_port_events.h>
+
+class TrexPlatformApi;
+class TrexStatelessCpToDpMsgBase;
/**
* describes a stateless port
@@ -29,15 +33,19 @@ limitations under the License.
* @author imarom (31-Aug-15)
*/
class TrexStatelessPort {
+ friend class TrexDpPortEvent;
+
public:
/**
* port state
*/
enum port_state_e {
- PORT_STATE_DOWN,
- PORT_STATE_UP_IDLE,
- PORT_STATE_TRANSMITTING
+ PORT_STATE_DOWN = 0x1,
+ PORT_STATE_IDLE = 0x2,
+ PORT_STATE_STREAMS = 0x4,
+ PORT_STATE_TX = 0x8,
+ PORT_STATE_PAUSE = 0x10,
};
/**
@@ -50,31 +58,55 @@ public:
RC_ERR_FAILED_TO_COMPILE_STREAMS
};
- TrexStatelessPort(uint8_t port_id);
+ TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api);
+
+ /**
+ * acquire port
+ * throws TrexException in case of an error
+ */
+ void acquire(const std::string &user, bool force = false);
+
+ /**
+ * release the port from the current user
+ * throws TrexException in case of an error
+ */
+ void release(void);
/**
* start traffic
- *
+ * throws TrexException in case of an error
*/
- rc_e start_traffic(double mul);
+ void start_traffic(double mul, double duration = -1);
/**
* stop traffic
- *
+ * throws TrexException in case of an error
+ */
+ void stop_traffic(void);
+
+ /**
+ * pause traffic
+ * throws TrexException in case of an error
+ */
+ void pause_traffic(void);
+
+ /**
+ * resume traffic
+ * throws TrexException in case of an error
*/
- rc_e stop_traffic(void);
+ void resume_traffic(void);
/**
- * access the stream table
+ * update current traffic on port
*
*/
- TrexStreamTable *get_stream_table();
+ void update_traffic(double mul);
/**
* get the port state
*
*/
- port_state_e get_state() {
+ port_state_e get_state() const {
return m_port_state;
}
@@ -82,7 +114,7 @@ public:
* port state as string
*
*/
- std::string get_state_as_string();
+ std::string get_state_as_string() const;
/**
* fill up properties of the port
@@ -94,6 +126,7 @@ public:
*/
void get_properties(std::string &driver, std::string &speed);
+
/**
* query for ownership
*
@@ -111,10 +144,75 @@ public:
return m_owner_handler;
}
- bool is_free_to_aquire() {
- return (m_owner == "none");
+
+ bool verify_owner_handler(const std::string &handler) {
+
+ return ( (m_owner != "none") && (m_owner_handler == handler) );
+
+ }
+
+ /**
+ * encode stats as JSON
+ */
+ void encode_stats(Json::Value &port);
+
+ uint8_t get_port_id() {
+ return m_port_id;
+ }
+
+ /**
+ * delegators
+ *
+ */
+
+ void add_stream(TrexStream *stream) {
+ verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
+
+ m_stream_table.add_stream(stream);
+
+ change_state(PORT_STATE_STREAMS);
}
+ void remove_stream(TrexStream *stream) {
+ verify_state(PORT_STATE_STREAMS);
+
+ m_stream_table.remove_stream(stream);
+
+ if (m_stream_table.size() == 0) {
+ change_state(PORT_STATE_IDLE);
+ }
+ }
+
+ void remove_and_delete_all_streams() {
+ verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS);
+
+ m_stream_table.remove_and_delete_all_streams();
+
+ change_state(PORT_STATE_IDLE);
+ }
+
+ TrexStream * get_stream_by_id(uint32_t stream_id) {
+ return m_stream_table.get_stream_by_id(stream_id);
+ }
+
+ void get_id_list(std::vector<uint32_t> &id_list) {
+ m_stream_table.get_id_list(id_list);
+ }
+
+ void get_object_list(std::vector<TrexStream *> &object_list) {
+ m_stream_table.get_object_list(object_list);
+ }
+
+ TrexDpPortEvents & get_dp_events() {
+ return m_dp_events;
+ }
+
+
+
+private:
+
+
+
/**
* take ownership of the server array
* this is static
@@ -131,30 +229,43 @@ public:
m_owner_handler = "";
}
- bool verify_owner_handler(const std::string &handler) {
+ bool is_free_to_aquire() {
+ return (m_owner == "none");
+ }
- return ( (m_owner != "none") && (m_owner_handler == handler) );
+ const std::vector<int> get_core_id_list () {
+ return m_cores_id_list;
}
+ bool verify_state(int state, bool should_throw = true) const;
+
+ void change_state(port_state_e new_state);
+
+ std::string generate_handler();
+
+ void send_message_to_dp(TrexStatelessCpToDpMsgBase *msg);
+
/**
- * encode stats as JSON
+ * triggered when event occurs
+ *
*/
- void encode_stats(Json::Value &port);
+ void on_dp_event_occured(TrexDpPortEvent::event_e event_type);
- uint8_t get_port_id() {
- return m_port_id;
- }
-private:
+ TrexStreamTable m_stream_table;
+ uint8_t m_port_id;
+ port_state_e m_port_state;
+ std::string m_owner;
+ std::string m_owner_handler;
- std::string generate_handler();
+ /* holds the DP cores associated with this port */
+ std::vector<int> m_cores_id_list;
+
+ bool m_last_all_streams_continues;
+ double m_last_duration;
- TrexStreamTable m_stream_table;
- uint8_t m_port_id;
- port_state_e m_port_state;
- std::string m_owner;
- std::string m_owner_handler;
+ TrexDpPortEvents m_dp_events;
};
#endif /* __TREX_STATELESS_PORT_H__ */
diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp
index ba306137..5203b2a2 100644
--- a/src/stateless/cp/trex_stream.cpp
+++ b/src/stateless/cp/trex_stream.cpp
@@ -25,9 +25,76 @@ limitations under the License.
/**************************************
* stream
*************************************/
-TrexStream::TrexStream(uint8_t port_id, uint32_t stream_id) : m_port_id(port_id), m_stream_id(stream_id) {
+
+
+std::string TrexStream::get_stream_type_str(stream_type_t stream_type){
+
+ std::string res;
+
+
+ switch (stream_type) {
+
+ case stCONTINUOUS :
+ res="stCONTINUOUS ";
+ break;
+
+ case stSINGLE_BURST :
+ res="stSINGLE_BURST ";
+ break;
+
+ case stMULTI_BURST :
+ res="stMULTI_BURST ";
+ break;
+ default:
+ res="Unknow ";
+ };
+ return(res);
+}
+
+
+void TrexStream::Dump(FILE *fd){
+
+ fprintf(fd,"\n");
+ fprintf(fd,"==> Stream_id : %lu \n",(ulong)m_stream_id);
+ fprintf(fd," Enabled : %lu \n",(ulong)(m_enabled?1:0));
+ fprintf(fd," Self_start : %lu \n",(ulong)(m_self_start?1:0));
+
+ if (m_next_stream_id>=0) {
+ fprintf(fd," Nex_stream_id : %lu \n",(ulong)m_next_stream_id);
+ }else {
+ fprintf(fd," Nex_stream_id : %d \n",m_next_stream_id);
+ }
+
+ fprintf(fd," Port_id : %lu \n",(ulong)m_port_id);
+
+ if (m_isg_usec>0.0) {
+ fprintf(fd," isg : %6.2f \n",m_isg_usec);
+ }
+ fprintf(fd," type : %s \n",get_stream_type_str(m_type).c_str());
+
+ if ( m_type == TrexStream::stCONTINUOUS ) {
+ fprintf(fd," pps : %f \n",m_pps);
+ }
+ if (m_type == TrexStream::stSINGLE_BURST) {
+ fprintf(fd," pps : %f \n",m_pps);
+ fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts);
+ }
+ if (m_type == TrexStream::stMULTI_BURST) {
+ fprintf(fd," pps : %f \n",m_pps);
+ fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts);
+ fprintf(fd," mburst : %lu \n",(ulong)m_num_bursts);
+ if (m_ibg_usec>0.0) {
+ fprintf(fd," m_ibg_usec : %f \n",m_ibg_usec);
+ }
+ }
+}
+
+
+TrexStream::TrexStream(uint8_t type,
+ uint8_t port_id, uint32_t stream_id) : m_port_id(port_id), m_stream_id(stream_id) {
/* default values */
+ m_type = type;
m_isg_usec = 0;
m_next_stream_id = -1;
m_enabled = false;
@@ -38,6 +105,11 @@ TrexStream::TrexStream(uint8_t port_id, uint32_t stream_id) : m_port_id(port_id)
m_rx_check.m_enable = false;
+
+ m_pps=-1.0;
+ m_burst_total_pkts=0;
+ m_num_bursts=1;
+ m_ibg_usec=0.0;
}
TrexStream::~TrexStream() {
diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h
index c8a15240..0634829e 100644
--- a/src/stateless/cp/trex_stream.h
+++ b/src/stateless/cp/trex_stream.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -29,9 +30,28 @@ limitations under the License.
#include <json/json.h>
#include <trex_stream_vm.h>
+#include <stdio.h>
+#include <string.h>
class TrexRpcCmdAddStream;
+
+struct CStreamPktData {
+ uint8_t *binary;
+ uint16_t len;
+
+ std::string meta;
+
+public:
+ inline void clone(uint8_t * in_binary,
+ uint32_t in_pkt_size){
+ binary = new uint8_t[in_pkt_size];
+ len = in_pkt_size;
+ memcpy(binary,in_binary,in_pkt_size);
+ }
+};
+
+
/**
* Stateless Stream
*
@@ -39,8 +59,20 @@ class TrexRpcCmdAddStream;
class TrexStream {
public:
- TrexStream(uint8_t port_id, uint32_t stream_id);
- virtual ~TrexStream() = 0;
+ enum STREAM_TYPE {
+ stNONE = 0,
+ stCONTINUOUS = 4,
+ stSINGLE_BURST = 5,
+ stMULTI_BURST = 6
+ };
+
+ typedef uint8_t stream_type_t ;
+
+ static std::string get_stream_type_str(stream_type_t stream_type);
+
+public:
+ TrexStream(uint8_t type,uint8_t port_id, uint32_t stream_id);
+ virtual ~TrexStream();
/* defines the min max per packet supported */
static const uint32_t MIN_PKT_SIZE_BYTES = 1;
@@ -52,10 +84,78 @@ public:
/* access the stream json */
const Json::Value & get_stream_json();
+ /* compress the stream id to be zero based */
+ void fix_dp_stream_id(uint32_t my_stream_id,int next_stream_id){
+ m_stream_id = my_stream_id;
+ m_next_stream_id = next_stream_id;
+ }
+
+ double get_pps() {
+ return m_pps;
+ }
+
+ void set_pps(double pps){
+ m_pps = pps;
+ }
+
+ void set_type(uint8_t type){
+ m_type = type;
+ }
+
+ uint8_t get_type(void) const {
+ return ( m_type );
+ }
+
+ bool is_dp_next_stream(){
+ if (m_next_stream_id<0) {
+ return (false);
+ }else{
+ return (true);
+ }
+ }
+
+
+
+ void set_multi_burst(uint32_t burst_total_pkts,
+ uint32_t num_bursts,
+ double ibg_usec) {
+ m_burst_total_pkts = burst_total_pkts;
+ m_num_bursts = num_bursts;
+ m_ibg_usec = ibg_usec;
+ }
+
+ void set_single_burst(uint32_t burst_total_pkts){
+ set_multi_burst(burst_total_pkts,1,0.0);
+ }
+
+ /* create new stream */
+ TrexStream * clone_as_dp(){
+ TrexStream * dp=new TrexStream(m_type,m_port_id,m_stream_id);
+
+
+ dp->m_isg_usec = m_isg_usec;
+ dp->m_next_stream_id = m_next_stream_id;
+
+ dp->m_enabled = m_enabled;
+ dp->m_self_start = m_self_start;
+
+ /* deep copy */
+ dp->m_pkt.clone(m_pkt.binary,m_pkt.len);
+
+ dp->m_rx_check = m_rx_check;
+ dp->m_pps = m_pps;
+ dp->m_burst_total_pkts = m_burst_total_pkts;
+ dp->m_num_bursts = m_num_bursts;
+ dp->m_ibg_usec = m_ibg_usec ;
+ return (dp);
+ }
+
+ void Dump(FILE *fd);
public:
/* basic */
+ uint8_t m_type;
uint8_t m_port_id;
- uint32_t m_stream_id;
+ uint32_t m_stream_id; /* id from RPC can be anything */
/* config fields */
@@ -65,13 +165,9 @@ public:
/* indicators */
bool m_enabled;
bool m_self_start;
-
+
+ CStreamPktData m_pkt;
/* pkt */
- struct {
- uint8_t *binary;
- uint16_t len;
- std::string meta;
- } m_pkt;
/* VM */
StreamVm m_vm;
@@ -85,64 +181,19 @@ public:
} m_rx_check;
+ double m_pps;
- /* original template provided by requester */
- Json::Value m_stream_json;
-};
+ uint32_t m_burst_total_pkts; /* valid in case of burst stSINGLE_BURST,stMULTI_BURST*/
-/**
- * continuous stream
- *
- */
-class TrexStreamContinuous : public TrexStream {
-public:
- TrexStreamContinuous(uint8_t port_id, uint32_t stream_id, double pps) : TrexStream(port_id, stream_id), m_pps(pps) {
- }
+ uint32_t m_num_bursts; /* valid in case of stMULTI_BURST */
- double get_pps() {
- return m_pps;
- }
+ double m_ibg_usec; /* valid in case of stMULTI_BURST */
-protected:
- double m_pps;
-};
-
-/**
- * single burst
- *
- */
-class TrexStreamBurst : public TrexStream {
-public:
- TrexStreamBurst(uint8_t port_id, uint32_t stream_id, uint32_t total_pkts, double pps) :
- TrexStream(port_id, stream_id),
- m_total_pkts(total_pkts),
- m_pps(pps) {
- }
+ /* original template provided by requester */
+ Json::Value m_stream_json;
-protected:
- uint32_t m_total_pkts;
- double m_pps;
};
-/**
- * multi burst
- *
- */
-class TrexStreamMultiBurst : public TrexStreamBurst {
-public:
- TrexStreamMultiBurst(uint8_t port_id,
- uint32_t stream_id,
- uint32_t pkts_per_burst,
- double pps,
- uint32_t num_bursts,
- double ibg_usec) : TrexStreamBurst(port_id, stream_id, pkts_per_burst, pps), m_num_bursts(num_bursts), m_ibg_usec(ibg_usec) {
-
- }
-protected:
- uint32_t m_num_bursts;
- double m_ibg_usec;
-
-};
/**
* holds all the streams
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
index 5e2602ec..302863ae 100644
--- a/src/stateless/cp/trex_streams_compiler.cpp
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -19,42 +19,390 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-#include <string.h>
+#include <string>
+#include <sstream>
#include <trex_streams_compiler.h>
#include <trex_stream.h>
+#include <assert.h>
+#include <trex_stateless.h>
+#include <iostream>
+
+/**
+ * describes a graph node in the pre compile check
+ *
+ * @author imarom (16-Nov-15)
+ */
+class GraphNode {
+public:
+ GraphNode(TrexStream *stream, GraphNode *next) : m_stream(stream), m_next(next) {
+ marked = false;
+ m_compressed_stream_id=-1;
+ }
+
+ uint32_t get_stream_id() const {
+ return m_stream->m_stream_id;
+ }
+
+ const TrexStream *m_stream;
+ GraphNode *m_next;
+ std::vector<const GraphNode *> m_parents;
+ bool marked;
+ int m_compressed_stream_id;
+};
+
+/**
+ * node map
+ *
+ */
+class GraphNodeMap {
+public:
+
+ GraphNodeMap() : m_dead_end(NULL, NULL) {
+
+ }
+
+ bool add(GraphNode *node) {
+ if (has(node->get_stream_id())) {
+ return false;
+ }
+
+ m_nodes[node->get_stream_id()] = node;
+
+ if (node->m_stream->m_self_start) {
+ m_roots.push_back(node);
+ }
+
+ return true;
+ }
+
+ bool has(uint32_t stream_id) {
+
+ return (get(stream_id) != NULL);
+ }
+
+ GraphNode * get(uint32_t stream_id) {
+
+ if (stream_id == -1) {
+ return &m_dead_end;
+ }
+
+ auto search = m_nodes.find(stream_id);
+
+ if (search != m_nodes.end()) {
+ return search->second;
+ } else {
+ return NULL;
+ }
+ }
+
+ void clear_marks() {
+ for (auto node : m_nodes) {
+ node.second->marked = false;
+ }
+ }
+
+ void get_unmarked(std::vector <GraphNode *> &unmarked) {
+ for (auto node : m_nodes) {
+ if (!node.second->marked) {
+ unmarked.push_back(node.second);
+ }
+ }
+ }
+
+
+ ~GraphNodeMap() {
+ for (auto node : m_nodes) {
+ delete node.second;
+ }
+ m_nodes.clear();
+ }
+
+ std::vector <GraphNode *> & get_roots() {
+ return m_roots;
+ }
+
+
+ std::unordered_map<uint32_t, GraphNode *> get_nodes() {
+ return m_nodes;
+ }
+
+private:
+ std::unordered_map<uint32_t, GraphNode *> m_nodes;
+ std::vector <GraphNode *> m_roots;
+ GraphNode m_dead_end;
+};
/**************************************
* stream compiled object
*************************************/
TrexStreamsCompiledObj::TrexStreamsCompiledObj(uint8_t port_id, double mul) : m_port_id(port_id), m_mul(mul) {
+ m_all_continues=false;
}
TrexStreamsCompiledObj::~TrexStreamsCompiledObj() {
- for (auto &obj : m_objs) {
- delete obj.m_pkt;
+ for (auto obj : m_objs) {
+ delete obj.m_stream;
}
m_objs.clear();
}
+
void
-TrexStreamsCompiledObj::add_compiled_stream(double pps, uint8_t *pkt, uint16_t pkt_len) {
+TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream){
+
obj_st obj;
- obj.m_port_id = m_port_id;
- obj.m_pps = pps * m_mul;
- obj.m_pkt_len = pkt_len;
+ obj.m_stream = stream->clone_as_dp();
+
+ m_objs.push_back(obj);
+}
+
+void
+TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream,
+ uint32_t my_dp_id, int next_dp_id) {
+ obj_st obj;
- obj.m_pkt = new uint8_t[pkt_len];
- memcpy(obj.m_pkt, pkt, pkt_len);
+ obj.m_stream = stream->clone_as_dp();
+ /* compress the id's*/
+ obj.m_stream->fix_dp_stream_id(my_dp_id,next_dp_id);
m_objs.push_back(obj);
}
+void TrexStreamsCompiledObj::Dump(FILE *fd){
+ for (auto obj : m_objs) {
+ obj.m_stream->Dump(fd);
+ }
+}
+
+
+
+TrexStreamsCompiledObj *
+TrexStreamsCompiledObj::clone() {
+
+ /* use multiplier of 1 to avoid double mult */
+ TrexStreamsCompiledObj *new_compiled_obj = new TrexStreamsCompiledObj(m_port_id, 1);
+
+ /**
+ * clone each element
+ */
+ for (auto obj : m_objs) {
+ new_compiled_obj->add_compiled_stream(obj.m_stream);
+ }
+
+ new_compiled_obj->m_mul = m_mul;
+
+ return new_compiled_obj;
+}
+
+void
+TrexStreamsCompiler::add_warning(const std::string &warning) {
+ m_warnings.push_back("*** warning: " + warning);
+}
+
+void
+TrexStreamsCompiler::err(const std::string &err) {
+ throw TrexException("*** error: " + err);
+}
+
+void
+TrexStreamsCompiler::check_stream(const TrexStream *stream) {
+ std::stringstream ss;
+
+ /* cont. stream can point only on itself */
+ if (stream->get_type() == TrexStream::stCONTINUOUS) {
+ if (stream->m_next_stream_id != -1) {
+ ss << "continous stream '" << stream->m_stream_id << "' cannot point on another stream";
+ err(ss.str());
+ }
+ }
+}
+
+void
+TrexStreamsCompiler::allocate_pass(const std::vector<TrexStream *> &streams,
+ GraphNodeMap *nodes) {
+ std::stringstream ss;
+ uint32_t compressed_stream_id=0;
+
+
+ /* first pass - allocate all nodes and check for duplicates */
+ for (auto stream : streams) {
+
+ /* skip non enabled streams */
+ if (!stream->m_enabled) {
+ continue;
+ }
+
+ /* sanity check on the stream itself */
+ check_stream(stream);
+
+ /* duplicate stream id ? */
+ if (nodes->has(stream->m_stream_id)) {
+ ss << "duplicate instance of stream id " << stream->m_stream_id;
+ err(ss.str());
+ }
+
+ GraphNode *node = new GraphNode(stream, NULL);
+ /* allocate new compressed id */
+ node->m_compressed_stream_id = compressed_stream_id;
+
+ compressed_stream_id++;
+
+ /* add to the map */
+ assert(nodes->add(node));
+ }
+
+}
+
+/**
+ * on this pass we direct the graph to point to the right nodes
+ *
+ */
+void
+TrexStreamsCompiler::direct_pass(GraphNodeMap *nodes) {
+
+ /* second pass - direct the graph */
+ for (auto p : nodes->get_nodes()) {
+
+ GraphNode *node = p.second;
+ const TrexStream *stream = node->m_stream;
+
+ /* check the stream points on an existing stream */
+ GraphNode *next_node = nodes->get(stream->m_next_stream_id);
+ if (!next_node) {
+ std::stringstream ss;
+ ss << "stream " << node->get_stream_id() << " is pointing on non existent stream " << stream->m_next_stream_id;
+ err(ss.str());
+ }
+
+ node->m_next = next_node;
+
+ /* do we have more than one parent ? */
+ next_node->m_parents.push_back(node);
+ }
+
+
+ /* check for multiple parents */
+ for (auto p : nodes->get_nodes()) {
+ GraphNode *node = p.second;
+
+ if (node->m_parents.size() > 0 ) {
+ std::stringstream ss;
+
+ ss << "stream " << node->get_stream_id() << " is triggered by multiple streams: ";
+ for (auto x : node->m_parents) {
+ ss << x->get_stream_id() << " ";
+ }
+
+ add_warning(ss.str());
+ }
+ }
+}
+
+/**
+ * mark sure all the streams are reachable
+ *
+ */
+void
+TrexStreamsCompiler::check_for_unreachable_streams(GraphNodeMap *nodes) {
+ /* start with the roots */
+ std::vector <GraphNode *> next_nodes = nodes->get_roots();
+
+
+ nodes->clear_marks();
+
+ /* run BFS from all the roots */
+ while (!next_nodes.empty()) {
+
+ /* pull one */
+ GraphNode *node = next_nodes.back();
+ next_nodes.pop_back();
+ if (node->marked) {
+ continue;
+ }
+
+ node->marked = true;
+
+ if (node->m_next != NULL) {
+ next_nodes.push_back(node->m_next);
+ }
+
+ }
+
+ std::vector <GraphNode *> unmarked;
+ nodes->get_unmarked(unmarked);
+
+ if (!unmarked.empty()) {
+ std::stringstream ss;
+ for (auto node : unmarked) {
+ ss << "stream " << node->get_stream_id() << " is unreachable from any other stream\n";
+ }
+ err(ss.str());
+ }
+
+
+}
+
+/**
+ * check validation of streams for compile
+ *
+ * @author imarom (16-Nov-15)
+ *
+ * @param streams
+ * @param fail_msg
+ *
+ * @return bool
+ */
+void
+TrexStreamsCompiler::pre_compile_check(const std::vector<TrexStream *> &streams,
+ GraphNodeMap & nodes) {
+
+ m_warnings.clear();
+
+ /* allocate nodes */
+ allocate_pass(streams, &nodes);
+
+ /* direct the graph */
+ direct_pass(&nodes);
+
+ /* check for non reachable streams inside the graph */
+ check_for_unreachable_streams(&nodes);
+
+}
+
/**************************************
* stream compiler
*************************************/
bool
-TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj) {
+TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams,
+ TrexStreamsCompiledObj &obj,
+ std::string *fail_msg) {
+
+#if 0
+ fprintf(stdout,"------------pre compile \n");
+ for (auto stream : streams) {
+ stream->Dump(stdout);
+ }
+ fprintf(stdout,"------------pre compile \n");
+#endif
+
+ GraphNodeMap nodes;
+
+
+ /* compile checks */
+ try {
+ pre_compile_check(streams,nodes);
+ } catch (const TrexException &ex) {
+ if (fail_msg) {
+ *fail_msg = ex.what();
+ } else {
+ std::cout << ex.what();
+ }
+ return false;
+ }
+
+
+ bool all_continues=true;
/* for now we do something trivial, */
for (auto stream : streams) {
@@ -62,24 +410,26 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, TrexStrea
if (!stream->m_enabled) {
continue;
}
-
- /* for now skip also non self started streams */
- if (!stream->m_self_start) {
- continue;
+ if (stream->get_type() != TrexStream::stCONTINUOUS ) {
+ all_continues=false;
}
- /* for now support only continous ... */
- TrexStreamContinuous *cont_stream = dynamic_cast<TrexStreamContinuous *>(stream);
- if (!cont_stream) {
- continue;
+ int new_id= nodes.get(stream->m_stream_id)->m_compressed_stream_id;
+ assert(new_id>=0);
+ uint32_t my_stream_id = (uint32_t)new_id;
+ int my_next_stream_id=-1;
+ if (stream->m_next_stream_id>=0) {
+ my_next_stream_id=nodes.get(stream->m_next_stream_id)->m_compressed_stream_id;
}
/* add it */
- obj.add_compiled_stream(cont_stream->get_pps(),
- cont_stream->m_pkt.binary,
- cont_stream->m_pkt.len);
+ obj.add_compiled_stream(stream,
+ my_stream_id,
+ my_next_stream_id
+ );
}
-
+ obj.m_all_continues =all_continues;
return true;
}
+
diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h
index 06f992ed..17ca3c74 100644
--- a/src/stateless/cp/trex_streams_compiler.h
+++ b/src/stateless/cp/trex_streams_compiler.h
@@ -23,9 +23,11 @@ limitations under the License.
#include <stdint.h>
#include <vector>
+#include <string>
class TrexStreamsCompiler;
class TrexStream;
+class GraphNodeMap;
/**
* compiled object for a table of streams
@@ -40,33 +42,78 @@ public:
~TrexStreamsCompiledObj();
struct obj_st {
- double m_pps;
- uint8_t *m_pkt;
- uint16_t m_pkt_len;
- uint8_t m_port_id;
+
+ TrexStream * m_stream;
};
const std::vector<obj_st> & get_objects() {
return m_objs;
}
+ uint8_t get_port_id(){
+ return (m_port_id);
+ }
+
+ /**
+ * clone the compiled object
+ *
+ */
+ TrexStreamsCompiledObj * clone();
+
+ double get_multiplier(){
+ return (m_mul);
+ }
+
+ bool get_all_streams_continues(){
+ return (m_all_continues);
+ }
+
+ void Dump(FILE *fd);
+
private:
- void add_compiled_stream(double pps, uint8_t *pkt, uint16_t pkt_len);
+ void add_compiled_stream(TrexStream * stream,
+ uint32_t my_dp_id, int next_dp_id);
+ void add_compiled_stream(TrexStream * stream);
+
std::vector<obj_st> m_objs;
+ bool m_all_continues;
uint8_t m_port_id;
double m_mul;
};
class TrexStreamsCompiler {
public:
+
/**
* compiles a vector of streams to an object passable to the DP
*
* @author imarom (28-Oct-15)
*
*/
- bool compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj);
+ bool compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj, std::string *fail_msg = NULL);
+
+ /**
+ *
+ * returns a reference pointer to the last compile warnings
+ * if no warnings were produced - the vector is empty
+ */
+ const std::vector<std::string> & get_last_compile_warnings() {
+ return m_warnings;
+ }
+
+private:
+
+ void pre_compile_check(const std::vector<TrexStream *> &streams,
+ GraphNodeMap & nodes);
+ void allocate_pass(const std::vector<TrexStream *> &streams, GraphNodeMap *nodes);
+ void direct_pass(GraphNodeMap *nodes);
+ void check_for_unreachable_streams(GraphNodeMap *nodes);
+ void check_stream(const TrexStream *stream);
+ void add_warning(const std::string &warning);
+ void err(const std::string &err);
+
+ std::vector<std::string> m_warnings;
};
#endif /* __TREX_STREAMS_COMPILER_H__ */
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 306b23d0..9b4a6ad9 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -22,12 +23,193 @@ limitations under the License.
#include <trex_stateless_messaging.h>
#include <trex_streams_compiler.h>
#include <trex_stream_node.h>
+#include <trex_stream.h>
#include <bp_sim.h>
-TrexStatelessDpCore::TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core) {
+
+void CDpOneStream::Delete(CFlowGenListPerThread * core){
+ assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
+ core->free_node((CGenNode *)m_node);
+ delete m_dp_stream;
+ m_node=0;
+ m_dp_stream=0;
+}
+
+void CDpOneStream::DeleteOnlyStream(){
+ assert(m_dp_stream);
+ delete m_dp_stream;
+ m_dp_stream=0;
+}
+
+int CGenNodeStateless::get_stream_id(){
+ if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
+ return (-1); // not valid
+ }
+ assert(m_ref_stream_info);
+ return ((int)m_ref_stream_info->m_stream_id);
+}
+
+
+void CGenNodeStateless::DumpHeader(FILE *fd){
+ fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
+
+}
+void CGenNodeStateless::Dump(FILE *fd){
+ fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu \n",
+ m_time,
+ (ulong)m_port_id,
+ "s-pkt", //action
+ get_stream_state_str(m_state ).c_str(),
+ get_stream_id(), //stream_id
+ TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
+ (ulong)m_multi_bursts,
+ (ulong)m_single_burst
+ );
+}
+
+
+void CGenNodeStateless::refresh(){
+
+ /* refill the stream info */
+ m_single_burst = m_single_burst_refill;
+ m_multi_bursts = m_ref_stream_info->m_num_bursts;
+ m_state = CGenNodeStateless::ss_ACTIVE;
+}
+
+
+void CGenNodeCommand::free_command(){
+
+ assert(m_cmd);
+ m_cmd->on_node_remove();
+ delete m_cmd;
+}
+
+
+std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
+ std::string res;
+
+ switch (stream_state) {
+ case CGenNodeStateless::ss_FREE_RESUSE :
+ res="FREE ";
+ break;
+ case CGenNodeStateless::ss_INACTIVE :
+ res="INACTIVE ";
+ break;
+ case CGenNodeStateless::ss_ACTIVE :
+ res="ACTIVE ";
+ break;
+ default:
+ res="Unknow ";
+ };
+ return(res);
+}
+
+
+void CGenNodeStateless::free_stl_node(){
+ /* if we have cache mbuf free it */
+ rte_mbuf_t * m=get_cache_mbuf();
+ if (m) {
+ rte_pktmbuf_free(m);
+ m_cache_mbuf=0;
+ }
+}
+
+
+bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
+ m_active_streams-=d; /* reduce the number of streams */
+ if (m_active_streams == 0) {
+ return (true);
+ }
+ return (false);
+}
+
+bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
+
+ /* we are working with continues streams so we must be in transmit mode */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node =dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
+ assert(node->is_pause() == true);
+ node->set_pause(false);
+ }
+ m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
+ return (true);
+}
+
+
+bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
+
+ /* we are working with continues streams so we must be in transmit mode */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node =dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
+ assert(node->is_pause() == false);
+ node->set_pause(true);
+ }
+ m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
+ return (true);
+}
+
+
+bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id){
+
+
+ if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
+ assert(m_active_streams==0);
+ return false;
+ }
+
+ /* there could be race of stop after stop */
+ if ( stop_on_id ) {
+ if (event_id != m_event_id){
+ /* we can't stop it is an old message */
+ return false;
+ }
+ }
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node =dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
+ if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
+ node->mark_for_free();
+ m_active_streams--;
+ dp_stream.DeleteOnlyStream();
+
+ }else{
+ dp_stream.Delete(m_core);
+ }
+ }
+
+ /* active stream should be zero */
+ assert(m_active_streams==0);
+ m_active_nodes.clear();
+ m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
+ return (true);
+}
+
+
+void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){
+ m_core=core;
+ m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
+ m_port_id=0;
+ m_active_streams=0;
+ m_active_nodes.clear();
+}
+
+
+
+void
+TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
m_thread_id = thread_id;
m_core = core;
+ m_local_port_offset = 2*core->getDualPortId();
CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
@@ -35,8 +217,54 @@ TrexStatelessDpCore::TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThrea
m_ring_to_cp = cp_dp->getRingDpToCp(thread_id);
m_state = STATE_IDLE;
+
+ int i;
+ for (i=0; i<NUM_PORTS_PER_CORE; i++) {
+ m_ports[i].create(core);
+ }
}
+
+/* move to the next stream, old stream move to INACTIVE */
+bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node){
+
+ assert(cur_node);
+ TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
+ bool schedule =false;
+
+ bool to_stop_port=false;
+
+ if (next_node == NULL) {
+ /* there is no next stream , reduce the number of active streams*/
+ to_stop_port = lp_port->update_number_of_active_streams(1);
+
+ }else{
+ uint8_t state=next_node->get_state();
+
+ /* can't be FREE_RESUSE */
+ assert(state != CGenNodeStateless::ss_FREE_RESUSE);
+ if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
+
+ /* refill start info and scedule, no update in active streams */
+ next_node->refresh();
+ schedule = true;
+
+ }else{
+ to_stop_port = lp_port->update_number_of_active_streams(1);
+ }
+ }
+
+ if ( to_stop_port ) {
+ /* call stop port explictly to move the state */
+ stop_traffic(cur_node->m_port_id,false,0);
+ }
+
+ return ( schedule );
+}
+
+
+
/**
* in idle state loop, the processor most of the time sleeps
* and periodically checks for messages
@@ -52,6 +280,15 @@ TrexStatelessDpCore::idle_state_loop() {
}
}
+
+
+void TrexStatelessDpCore::quit_main_loop(){
+ m_core->set_terminate_mode(true); /* mark it as terminated */
+ m_state = STATE_TERMINATE;
+ add_global_duration(0.0001);
+}
+
+
/**
* scehduler runs when traffic exists
* it will return when no more transmitting is done on this
@@ -68,37 +305,172 @@ TrexStatelessDpCore::start_scheduler() {
m_core->m_node_gen.add_node(node_sync);
double old_offset = 0.0;
- m_core->m_node_gen.flush_file(100000000, 0.0, false, m_core, old_offset);
+ m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
+ /* bail out in case of terminate */
+ if (m_state != TrexStatelessDpCore::STATE_TERMINATE) {
+ m_core->m_node_gen.close_file(m_core);
+ }
}
+
+void
+TrexStatelessDpCore::run_once(){
+
+ idle_state_loop();
+
+ if ( m_state == STATE_TERMINATE ){
+ return;
+ }
+
+ start_scheduler();
+}
+
+
+
+
void
TrexStatelessDpCore::start() {
while (true) {
- idle_state_loop();
+ run_once();
+
+ if ( m_core->is_terminated_by_master() ) {
+ break;
+ }
+ }
+}
+
+/* only if both port are idle we can exit */
+void
+TrexStatelessDpCore::schedule_exit(){
+
+ CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
+
+ node->m_type = CGenNode::COMMAND;
- start_scheduler();
+ node->m_cmd = new TrexStatelessDpCanQuit();
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = m_core->m_cur_time_sec ;
+
+ m_core->m_node_gen.add_node((CGenNode *)node);
+}
+
+
+void
+TrexStatelessDpCore::add_global_duration(double duration){
+ if (duration > 0.0) {
+ CGenNode *node = m_core->create_node() ;
+
+ node->m_type = CGenNode::EXIT_SCHED;
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = m_core->m_cur_time_sec + duration ;
+
+ m_core->m_node_gen.add_node(node);
}
}
+/* add per port exit */
void
-TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len) {
+TrexStatelessDpCore::add_port_duration(double duration,
+ uint8_t port_id,
+ int event_id){
+ if (duration > 0.0) {
+ CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
+
+ node->m_type = CGenNode::COMMAND;
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = m_core->m_cur_time_sec + duration ;
+
+ TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
+
+
+ /* test this */
+ m_core->m_non_active_nodes++;
+ cmd->set_core_ptr(m_core);
+ cmd->set_event_id(event_id);
+ cmd->set_wait_for_event_id(true);
+
+ node->m_cmd = cmd;
+
+ m_core->m_node_gen.add_node((CGenNode *)node);
+ }
+}
+
+
+void
+TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port,
+ TrexStream * stream,
+ TrexStreamsCompiledObj *comp) {
+
CGenNodeStateless *node = m_core->create_node_sl();
/* add periodic */
node->m_type = CGenNode::STATELESS_PKT;
- node->m_time = m_core->m_cur_time_sec + 0.0 /* STREAM ISG */;
+
+ node->m_ref_stream_info = stream->clone_as_dp();
+
+ node->m_next_stream=0; /* will be fixed later */
+
+
+ if ( stream->m_self_start ){
+ /* if self start it is in active mode */
+ node->m_state =CGenNodeStateless::ss_ACTIVE;
+ lp_port->m_active_streams++;
+ }else{
+ node->m_state =CGenNodeStateless::ss_INACTIVE;
+ }
+
+ node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
+
+ pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
node->m_flags = 0;
/* set socket id */
node->set_socket_id(m_core->m_node_gen.m_socket_id);
/* build a mbuf from a packet */
- uint16_t pkt_size = pkt_len;
- const uint8_t *stream_pkt = pkt;
+
+ uint16_t pkt_size = stream->m_pkt.len;
+ const uint8_t *stream_pkt = stream->m_pkt.binary;
+
+ node->m_pause =0;
+ node->m_stream_type = stream->m_type;
+ node->m_next_time_offset = 1.0 / (stream->get_pps() * comp->get_multiplier()) ;
+
+
+ /* stateless specific fields */
+ switch ( stream->m_type ) {
+
+ case TrexStream::stCONTINUOUS :
+ node->m_single_burst=0;
+ node->m_single_burst_refill=0;
+ node->m_multi_bursts=0;
+ node->m_ibg_sec = 0.0;
+ break;
- node->m_next_time_offset = 1.0 / pps;
- node->m_is_stream_active = 1;
+ case TrexStream::stSINGLE_BURST :
+ node->m_stream_type = TrexStream::stMULTI_BURST;
+ node->m_single_burst = stream->m_burst_total_pkts;
+ node->m_single_burst_refill = stream->m_burst_total_pkts;
+ node->m_multi_bursts = 1; /* single burst in multi burst of 1 */
+ node->m_ibg_sec = 0.0;
+ break;
+
+ case TrexStream::stMULTI_BURST :
+ node->m_single_burst = stream->m_burst_total_pkts;
+ node->m_single_burst_refill = stream->m_burst_total_pkts;
+ node->m_multi_bursts = stream->m_num_bursts;
+ node->m_ibg_sec = usec_to_sec( stream->m_ibg_usec );
+ break;
+ default:
+
+ assert(0);
+ };
+
+ node->m_port_id = stream->m_port_id;
/* allocate const mbuf */
rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size);
@@ -110,7 +482,6 @@ TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pk
memcpy(p,stream_pkt,pkt_size);
/* set dir 0 or 1 client or server */
- pkt_dir_t dir = 0;
node->set_mbuf_cache_dir(dir);
/* TBD repace the mac if req we should add flag */
@@ -119,55 +490,130 @@ TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pk
/* set the packet as a readonly */
node->set_cache_mbuf(m);
- /* keep track */
- m_active_nodes.push_back(node);
+ CDpOneStream one_stream;
- /* schedule */
- m_core->m_node_gen.add_node((CGenNode *)node);
+ one_stream.m_dp_stream = node->m_ref_stream_info;
+ one_stream.m_node =node;
- m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
+ lp_port->m_active_nodes.push_back(one_stream);
+ /* schedule only if active */
+ if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
+ m_core->m_node_gen.add_node((CGenNode *)node);
+ }
}
void
-TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj) {
+TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
+ double duration,
+ int event_id) {
+
+
+ TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
+ lp_port->m_active_streams = 0;
+ lp_port->set_event_id(event_id);
+
+ /* no nodes in the list */
+ assert(lp_port->m_active_nodes.size()==0);
+
+ for (auto single_stream : obj->get_objects()) {
+ /* all commands should be for the same port */
+ assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
+ add_cont_stream(lp_port,single_stream.m_stream,obj);
+ }
+
+ uint32_t nodes = lp_port->m_active_nodes.size();
+ /* find next stream */
+ assert(nodes == obj->get_objects().size());
+
+ int cnt=0;
+
+ /* set the next_stream pointer */
for (auto single_stream : obj->get_objects()) {
- add_cont_stream(single_stream.m_pps, single_stream.m_pkt, single_stream.m_pkt_len);
+
+ if (single_stream.m_stream->is_dp_next_stream() ) {
+ int stream_id = single_stream.m_stream->m_next_stream_id;
+ assert(stream_id<nodes);
+ /* point to the next stream , stream_id is fixed */
+ lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
+ }
+ cnt++;
+ }
+
+ lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
+ m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
+
+
+ if ( duration > 0.0 ){
+ add_port_duration( duration ,obj->get_port_id(),event_id );
}
+
}
-void
-TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
- /* we cannot remove nodes not from the top of the queue so
- for every active node - make sure next time
- the scheduler invokes it, it will be free */
- for (auto node : m_active_nodes) {
- if (node->m_port_id == port_id) {
- node->m_is_stream_active = 0;
+
+bool TrexStatelessDpCore::are_all_ports_idle(){
+
+ bool res=true;
+ int i;
+ for (i=0; i<NUM_PORTS_PER_CORE; i++) {
+ if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
+ res=false;
}
}
+ return (res);
+}
- /* remove all the non active nodes */
- auto pred = std::remove_if(m_active_nodes.begin(),
- m_active_nodes.end(),
- [](CGenNodeStateless *node) { return (!node->m_is_stream_active); });
- m_active_nodes.erase(pred, m_active_nodes.end());
+void
+TrexStatelessDpCore::resume_traffic(uint8_t port_id){
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ lp_port->resume_traffic(port_id);
+}
- if (m_active_nodes.size() == 0) {
- m_state = STATE_IDLE;
- /* stop the scheduler */
- CGenNode *node = m_core->create_node() ;
+void
+TrexStatelessDpCore::pause_traffic(uint8_t port_id){
- node->m_type = CGenNode::EXIT_SCHED;
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
- /* make sure it will be scheduled after the current node */
- node->m_time = m_core->m_node_gen.m_p_queue.top()->m_time;
+ lp_port->pause_traffic(port_id);
+}
- m_core->m_node_gen.add_node(node);
+
+void
+TrexStatelessDpCore::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id) {
+ /* we cannot remove nodes not from the top of the queue so
+ for every active node - make sure next time
+ the scheduler invokes it, it will be free */
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
+ /* nothing to do ! already stopped */
+ //printf(" skip .. %f\n",m_core->m_cur_time_sec);
+ return;
}
-
+
+#if 0
+ if ( are_all_ports_idle() ) {
+ /* just a place holder if we will need to do somthing in that case */
+ }
+#endif
+
+ /* inform the control plane we stopped - this might be a async stop
+ (streams ended)
+ */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
+ TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
+ port_id,
+ TrexDpPortEvent::EVENT_STOP,
+ lp_port->get_event_id());
+ ring->Enqueue((CGenNode *)event_msg);
+
}
/**
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 698cac2f..eda1ae59 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -31,6 +32,74 @@ class TrexStatelessDpStart;
class CFlowGenListPerThread;
class CGenNodeStateless;
class TrexStreamsCompiledObj;
+class TrexStream;
+
+
+class CDpOneStream {
+public:
+ void Create(){
+ }
+
+ void Delete(CFlowGenListPerThread * core);
+ void DeleteOnlyStream();
+
+ CGenNodeStateless * m_node; // schedule node
+ TrexStream * m_dp_stream; // stream info
+};
+
+class TrexStatelessDpPerPort {
+
+public:
+ /* states */
+ enum state_e {
+ ppSTATE_IDLE,
+ ppSTATE_TRANSMITTING,
+ ppSTATE_PAUSE
+
+ };
+
+public:
+ TrexStatelessDpPerPort(){
+ }
+
+ void create(CFlowGenListPerThread * core);
+
+ bool pause_traffic(uint8_t port_id);
+
+ bool resume_traffic(uint8_t port_id);
+
+ bool stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id);
+
+ bool update_number_of_active_streams(uint32_t d);
+
+ state_e get_state() {
+ return m_state;
+ }
+
+ void set_event_id(int event_id) {
+ m_event_id = event_id;
+ }
+
+ int get_event_id() {
+ return m_event_id;
+ }
+
+public:
+
+ state_e m_state;
+ uint8_t m_port_id;
+
+ uint32_t m_active_streams; /* how many active streams on this port */
+
+ std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */
+ CFlowGenListPerThread * m_core ;
+ int m_event_id;
+};
+
+/* for now */
+#define NUM_PORTS_PER_CORE 2
class TrexStatelessDpCore {
@@ -39,10 +108,24 @@ public:
/* states */
enum state_e {
STATE_IDLE,
- STATE_TRANSMITTING
+ STATE_TRANSMITTING,
+ STATE_TERMINATE
+
};
- TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core);
+ TrexStatelessDpCore() {
+ m_thread_id = 0;
+ m_core = NULL;
+ m_duration = -1;
+ }
+
+ /**
+ * "static constructor"
+ *
+ * @param thread_id
+ * @param core
+ */
+ void create(uint8_t thread_id, CFlowGenListPerThread *core);
/**
* launch the stateless DP core code
@@ -50,6 +133,10 @@ public:
*/
void start();
+
+ /* exit after batch of commands */
+ void run_once();
+
/**
* dummy traffic creator
*
@@ -58,13 +145,30 @@ public:
* @param pkt
* @param pkt_len
*/
- void start_traffic(TrexStreamsCompiledObj *obj);
+ void start_traffic(TrexStreamsCompiledObj *obj,
+ double duration,
+ int m_event_id);
+
+
+ /* pause the streams, work only if all are continues */
+ void pause_traffic(uint8_t port_id);
+
+
+
+ void resume_traffic(uint8_t port_id);
+
/**
+ *
* stop all traffic for this core
*
*/
- void stop_traffic(uint8_t port_id);
+ void stop_traffic(uint8_t port_id,bool stop_on_id, int event_id);
+
+
+ /* return if all ports are idel */
+ bool are_all_ports_idle();
+
/**
* check for and handle messages from CP
@@ -92,7 +196,31 @@ public:
}
+ /* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */
+ void quit_main_loop();
+
+ state_e get_state() {
+ return m_state;
+ }
+
+ bool set_stateless_next_node(CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node);
+
+
+ TrexStatelessDpPerPort * get_port_db(uint8_t port_id){
+ assert((m_local_port_offset==port_id) ||(m_local_port_offset+1==port_id));
+ uint8_t local_port_id = port_id -m_local_port_offset;
+ assert(local_port_id<NUM_PORTS_PER_CORE);
+ return (&m_ports[local_port_id]);
+ }
+
+
+
private:
+
+ void schedule_exit();
+
+
/**
* in idle state loop, the processor most of the time sleeps
* and periodically checks for messages
@@ -115,18 +243,30 @@ private:
*/
void handle_cp_msg(TrexStatelessCpToDpMsgBase *msg);
- void add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len);
+
+ void add_port_duration(double duration,
+ uint8_t port_id,
+ int event_id);
+
+ void add_global_duration(double duration);
+
+ void add_cont_stream(TrexStatelessDpPerPort * lp_port,
+ TrexStream * stream,
+ TrexStreamsCompiledObj *comp);
uint8_t m_thread_id;
- state_e m_state;
+ uint8_t m_local_port_offset;
+
+ state_e m_state; /* state of all ports */
CNodeRing *m_ring_from_cp;
CNodeRing *m_ring_to_cp;
- /* holds the current active nodes */
- std::vector<CGenNodeStateless *> m_active_nodes;
+ TrexStatelessDpPerPort m_ports[NUM_PORTS_PER_CORE];
/* pointer to the main object */
- CFlowGenListPerThread *m_core;
+ CFlowGenListPerThread * m_core;
+
+ double m_duration;
};
#endif /* __TREX_STATELESS_DP_CORE_H__ */
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index 92b428ab..ccf99eaa 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoh Haim
Cisco Systems, Inc.
*/
@@ -22,44 +23,201 @@ limitations under the License.
#define __TREX_STREAM_NODE_H__
#include <bp_sim.h>
+#include <stdio.h>
class TrexStatelessDpCore;
+#include <trex_stream.h>
+
+class TrexStatelessCpToDpMsgBase;
+class CFlowGenListPerThread;
+
+struct CGenNodeCommand : public CGenNodeBase {
+
+friend class TrexStatelessDpCore;
+
+public:
+ TrexStatelessCpToDpMsgBase * m_cmd;
+
+ uint8_t m_pad_end[104];
+
+public:
+ void free_command();
+
+} __rte_cache_aligned;;
+
+
+static_assert(sizeof(CGenNodeCommand) == sizeof(CGenNode), "sizeof(CGenNodeCommand) != sizeof(CGenNode)" );
+
/* this is a event for stateless */
struct CGenNodeStateless : public CGenNodeBase {
friend class TrexStatelessDpCore;
+public:
+ enum {
+ ss_FREE_RESUSE =1, /* should be free by scheduler */
+ ss_INACTIVE =2, /* will be active by other stream or stopped */
+ ss_ACTIVE =3 /* the stream is active */
+ };
+ typedef uint8_t stream_state_t ;
+
+ static std::string get_stream_state_str(stream_state_t stream_state);
+
private:
+ /* cache line 0 */
+ /* important stuff here */
void * m_cache_mbuf;
- double m_next_time_offset;
- uint8_t m_is_stream_active;
+ double m_next_time_offset; /* in sec */
+ double m_ibg_sec; /* inter burst time in sec */
+
+
+ stream_state_t m_state;
uint8_t m_port_id;
+ uint8_t m_stream_type; /* see TrexStream::STREAM_TYPE ,stream_type_t */
+ uint8_t m_pause;
+
+ uint32_t m_single_burst; /* the number of bursts in case of burst */
+ uint32_t m_single_burst_refill;
+
+ uint32_t m_multi_bursts; /* in case of multi_burst how many bursts */
+
+ /* cache line 1 */
+ TrexStream * m_ref_stream_info; /* the stream info */
+ CGenNodeStateless * m_next_stream;
/* pad to match the size of CGenNode */
- uint8_t m_pad_end[87];
+ uint8_t m_pad_end[56];
+
+
public:
- inline bool is_active() {
- return m_is_stream_active;
+ uint8_t get_port_id(){
+ return (m_port_id);
+ }
+
+
+ /* we restart the stream, schedule it using stream isg */
+ inline void update_refresh_time(double cur_time){
+ m_time = cur_time + usec_to_sec(m_ref_stream_info->m_isg_usec);
+ }
+
+ inline bool is_mask_for_free(){
+ return (get_state() == CGenNodeStateless::ss_FREE_RESUSE ?true:false);
+
+ }
+ inline void mark_for_free(){
+ set_state(CGenNodeStateless::ss_FREE_RESUSE);
+ /* only to be safe */
+ m_ref_stream_info= NULL;
+ m_next_stream= NULL;
+ }
+
+ bool is_pause(){
+ return (m_pause==1?true:false);
+ }
+
+ void set_pause(bool enable){
+ if ( enable ){
+ m_pause=1;
+ }else{
+ m_pause=0;
+ }
+ }
+
+ inline uint8_t get_stream_type(){
+ return (m_stream_type);
+ }
+
+ inline uint32_t get_single_burst_cnt(){
+ return (m_single_burst);
+ }
+
+ inline double get_multi_ibg_sec(){
+ return (m_ibg_sec);
+ }
+
+ inline uint32_t get_multi_burst_cnt(){
+ return (m_multi_bursts);
+ }
+
+ inline void set_state(stream_state_t new_state){
+ m_state=new_state;
+ }
+
+
+ inline stream_state_t get_state() {
+ return m_state;
+ }
+
+ void refresh();
+
+ inline void handle_continues(CFlowGenListPerThread *thread) {
+
+ if (unlikely (is_pause()==false)) {
+ thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+ }
+
+ /* in case of continues */
+ m_time += m_next_time_offset;
+
+ /* insert a new event */
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
+ }
+
+ inline void handle_multi_burst(CFlowGenListPerThread *thread) {
+ thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+
+ m_single_burst--;
+ if (m_single_burst > 0 ) {
+ /* in case of continues */
+ m_time += m_next_time_offset;
+
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
+ }else{
+ m_multi_bursts--;
+ if ( m_multi_bursts == 0 ) {
+ set_state(CGenNodeStateless::ss_INACTIVE);
+ if ( thread->set_stateless_next_node(this,m_next_stream) ){
+ /* update the next stream time using isg */
+ m_next_stream->update_refresh_time(m_time);
+
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)m_next_stream);
+ }else{
+ // in case of zero we will schedule a command to stop
+ // will be called from set_stateless_next_node
+ }
+
+ }else{
+ m_time += m_ibg_sec;
+ m_single_burst = m_single_burst_refill;
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
+ }
+ }
}
/**
* main function to handle an event of a packet tx
*
+ *
+ *
*/
- inline void handle(CFlowGenListPerThread *thread) {
- thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+ inline void handle(CFlowGenListPerThread *thread) {
- /* in case of continues */
- m_time += m_next_time_offset;
+ if (m_stream_type == TrexStream::stCONTINUOUS ) {
+ handle_continues(thread) ;
+ }else{
+ if (m_stream_type == TrexStream::stMULTI_BURST) {
+ handle_multi_burst(thread);
+ }else{
+ assert(0);
+ }
+ }
- /* insert a new event */
- thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
}
void set_socket_id(socket_id_t socket){
@@ -82,8 +240,6 @@ public:
return ((pkt_dir_t)( m_flags &1));
}
-
-
inline void set_cache_mbuf(rte_mbuf_t * m){
m_cache_mbuf=(void *)m;
m_flags |= NODE_FLAGS_MBUF_CACHE;
@@ -97,9 +253,22 @@ public:
}
}
+ void free_stl_node();
+
+public:
+ /* debug functions */
+
+ int get_stream_id();
+
+ static void DumpHeader(FILE *fd);
+
+ void Dump(FILE *fd);
} __rte_cache_aligned;
-static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)");
+static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)" );
+
+
+
#endif /* __TREX_STREAM_NODE_H__ */
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 3e754649..ec8b7839 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -21,12 +22,34 @@ limitations under the License.
#include <trex_stateless_messaging.h>
#include <trex_stateless_dp_core.h>
#include <trex_streams_compiler.h>
+#include <trex_stateless.h>
+#include <bp_sim.h>
+
#include <string.h>
/*************************
start traffic message
************************/
-TrexStatelessDpStart::TrexStatelessDpStart(TrexStreamsCompiledObj *obj) : m_obj(obj) {
+TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexStreamsCompiledObj *obj, double duration) {
+ m_port_id = port_id;
+ m_event_id = event_id;
+ m_obj = obj;
+ m_duration = duration;
+}
+
+
+/**
+ * clone for DP start message
+ *
+ */
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpStart::clone() {
+
+ TrexStreamsCompiledObj *new_obj = m_obj->clone();
+
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStart(m_port_id, m_event_id, new_obj, m_duration);
+
+ return new_msg;
}
TrexStatelessDpStart::~TrexStatelessDpStart() {
@@ -38,7 +61,9 @@ TrexStatelessDpStart::~TrexStatelessDpStart() {
bool
TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
- dp_core->start_traffic(m_obj);
+ /* staet traffic */
+ dp_core->start_traffic(m_obj, m_duration,m_event_id);
+
return true;
}
@@ -47,7 +72,104 @@ TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
************************/
bool
TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) {
- dp_core->stop_traffic(m_port_id);
+
+
+ dp_core->stop_traffic(m_port_id,m_stop_only_for_event_id,m_event_id);
return true;
}
+
+void TrexStatelessDpStop::on_node_remove(){
+ if ( m_core ) {
+ assert(m_core->m_non_active_nodes>0);
+ m_core->m_non_active_nodes--;
+ }
+}
+
+
+TrexStatelessCpToDpMsgBase * TrexStatelessDpPause::clone(){
+
+ TrexStatelessDpPause *new_msg = new TrexStatelessDpPause(m_port_id);
+ return new_msg;
+}
+
+
+bool TrexStatelessDpPause::handle(TrexStatelessDpCore *dp_core){
+ dp_core->pause_traffic(m_port_id);
+ return (true);
+}
+
+
+
+TrexStatelessCpToDpMsgBase * TrexStatelessDpResume::clone(){
+ TrexStatelessDpResume *new_msg = new TrexStatelessDpResume(m_port_id);
+ return new_msg;
+}
+
+bool TrexStatelessDpResume::handle(TrexStatelessDpCore *dp_core){
+ dp_core->resume_traffic(m_port_id);
+ return (true);
+}
+
+
+/**
+ * clone for DP stop message
+ *
+ */
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpStop::clone() {
+ TrexStatelessDpStop *new_msg = new TrexStatelessDpStop(m_port_id);
+
+ new_msg->set_event_id(m_event_id);
+ new_msg->set_wait_for_event_id(m_stop_only_for_event_id);
+ /* set back pointer to master */
+ new_msg->set_core_ptr(m_core);
+
+ return new_msg;
+}
+
+
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpQuit::clone(){
+
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpQuit();
+
+ return new_msg;
+}
+
+
+bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){
+
+ /* quit */
+ dp_core->quit_main_loop();
+ return (true);
+}
+
+bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){
+
+ if ( dp_core->are_all_ports_idle() ){
+ /* if all ports are idle quit now */
+ set_quit(true);
+ }
+ return (true);
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpCanQuit::clone(){
+
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpCanQuit();
+
+ return new_msg;
+}
+
+
+/************************* messages from DP to CP **********************/
+bool
+TrexDpPortEventMsg::handle() {
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id);
+ port->get_dp_events().handle_event(m_event_type, m_thread_id, m_event_id);
+
+ return (true);
+}
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 381e146d..6bd0dbe3 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -22,9 +23,11 @@ limitations under the License.
#define __TREX_STATELESS_MESSAGING_H__
#include <msg_manager.h>
+#include <trex_dp_port_events.h>
class TrexStatelessDpCore;
class TrexStreamsCompiledObj;
+class CFlowGenListPerThread;
/**
* defines the base class for CP to DP messages
@@ -35,16 +38,40 @@ class TrexStatelessCpToDpMsgBase {
public:
TrexStatelessCpToDpMsgBase() {
+ m_quit_scheduler=false;
}
virtual ~TrexStatelessCpToDpMsgBase() {
}
+
+ virtual bool handle(TrexStatelessDpCore *dp_core) = 0;
+
/**
- * virtual function to handle a message
+ * clone the current message
*
*/
- virtual bool handle(TrexStatelessDpCore *dp_core) = 0;
+ virtual TrexStatelessCpToDpMsgBase * clone() = 0;
+
+ /* do we want to quit scheduler, can be set by handle function */
+ void set_quit(bool enable){
+ m_quit_scheduler=enable;
+ }
+
+ bool is_quit(){
+ return ( m_quit_scheduler);
+ }
+
+ /* this node is called from scheduler in case the node is free */
+ virtual void on_node_remove(){
+ }
+
+ /* no copy constructor */
+ TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete;
+
+protected:
+ int m_event_id;
+ bool m_quit_scheduler;
};
/**
@@ -55,16 +82,59 @@ public:
class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase {
public:
- TrexStatelessDpStart(TrexStreamsCompiledObj *obj);
+ TrexStatelessDpStart(uint8_t m_port_id, int m_event_id, TrexStreamsCompiledObj *obj, double duration);
~TrexStatelessDpStart();
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
virtual bool handle(TrexStatelessDpCore *dp_core);
private:
+
+ uint8_t m_port_id;
+ int m_event_id;
TrexStreamsCompiledObj *m_obj;
+ double m_duration;
+
};
+class TrexStatelessDpPause : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpPause(uint8_t port_id) : m_port_id(port_id) {
+ }
+
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+
+private:
+ uint8_t m_port_id;
+};
+
+
+class TrexStatelessDpResume : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpResume(uint8_t port_id) : m_port_id(port_id) {
+ }
+
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+
+private:
+ uint8_t m_port_id;
+};
+
+
/**
* a message to stop traffic
*
@@ -74,13 +144,156 @@ class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase {
public:
TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) {
+ m_stop_only_for_event_id=false;
+ m_event_id=0;
+ m_core = NULL;
}
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+
virtual bool handle(TrexStatelessDpCore *dp_core);
+ void set_core_ptr(CFlowGenListPerThread * core){
+ m_core = core;
+ }
+
+ CFlowGenListPerThread * get_core_ptr(){
+ return ( m_core);
+ }
+
+
+ void set_event_id(int event_id){
+ m_event_id = event_id;
+ }
+
+ void set_wait_for_event_id(bool wait){
+ m_stop_only_for_event_id = wait;
+ }
+
+ virtual void on_node_remove();
+
+
+ bool get_is_stop_by_event_id(){
+ return (m_stop_only_for_event_id);
+ }
+
+ int get_event_id(){
+ return (m_event_id);
+ }
+
private:
uint8_t m_port_id;
+ bool m_stop_only_for_event_id;
+ int m_event_id;
+ CFlowGenListPerThread * m_core ;
+
+};
+
+/**
+ * a message to Quit the datapath traffic. support only stateless for now
+ *
+ * @author hhaim
+ */
+class TrexStatelessDpQuit : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpQuit() {
+ }
+
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
};
+/**
+ * a message to check if both port are idel and exit
+ *
+ * @author hhaim
+ */
+class TrexStatelessDpCanQuit : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpCanQuit() {
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+};
+
+
+
+/************************* messages from DP to CP **********************/
+
+/**
+ * defines the base class for CP to DP messages
+ *
+ * @author imarom (27-Oct-15)
+ */
+class TrexStatelessDpToCpMsgBase {
+public:
+
+ TrexStatelessDpToCpMsgBase() {
+ }
+
+ virtual ~TrexStatelessDpToCpMsgBase() {
+ }
+
+ /**
+ * virtual function to handle a message
+ *
+ */
+ virtual bool handle() = 0;
+
+ /* no copy constructor */
+ TrexStatelessDpToCpMsgBase(TrexStatelessDpToCpMsgBase &) = delete;
+
+};
+
+
+/**
+ * a message indicating an event has happened on a port at the
+ * DP
+ *
+ */
+class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
+public:
+
+ TrexDpPortEventMsg(int thread_id, uint8_t port_id, TrexDpPortEvent::event_e type, int event_id) {
+ m_thread_id = thread_id;
+ m_port_id = port_id;
+ m_event_type = type;
+ m_event_id = event_id;
+ }
+
+ virtual bool handle();
+
+ int get_thread_id() {
+ return m_thread_id;
+ }
+
+ uint8_t get_port_id() {
+ return m_port_id;
+ }
+
+ TrexDpPortEvent::event_e get_event_type() {
+ return m_event_type;
+ }
+
+ int get_event_id() {
+ return m_event_id;
+ }
+
+private:
+ int m_thread_id;
+ uint8_t m_port_id;
+ TrexDpPortEvent::event_e m_event_type;
+ int m_event_id;
+
+};
#endif /* __TREX_STATELESS_MESSAGING_H__ */
+
diff --git a/src/stub/trex_stateless_stub.cpp b/src/stub/trex_stateless_stub.cpp
index de56e57a..199356d8 100644
--- a/src/stub/trex_stateless_stub.cpp
+++ b/src/stub/trex_stateless_stub.cpp
@@ -4,7 +4,8 @@
class CFlowGenListPerThread;
class TrexStatelessCpToDpMsgBase;
-TrexStatelessDpCore::TrexStatelessDpCore(unsigned char, CFlowGenListPerThread*) {
+void
+TrexStatelessDpCore::create(unsigned char, CFlowGenListPerThread*) {
m_thread_id = 0;
m_core = NULL;
diff --git a/src/time_histogram.cpp b/src/time_histogram.cpp
index f1b47e59..96796bfc 100755
--- a/src/time_histogram.cpp
+++ b/src/time_histogram.cpp
@@ -182,10 +182,10 @@ void CTimeHistogram::DumpWinMax(FILE *fd){
}
void CTimeHistogram::Dump(FILE *fd){
- fprintf (fd," min_delta : %lu usec \n",get_usec(m_min_delta));
+ fprintf (fd," min_delta : %lu usec \n", (ulong)get_usec(m_min_delta));
fprintf (fd," cnt : %lu \n",m_cnt);
fprintf (fd," high_cnt : %lu \n",m_high_cnt);
- fprintf (fd," max_d_time : %lu usec\n",get_usec(m_max_dt));
+ fprintf (fd," max_d_time : %lu usec\n", (ulong)get_usec(m_max_dt));
//fprintf (fd," average : %.0f usec\n", get_total_average());
fprintf (fd," sliding_average : %.0f usec\n", get_average_latency());
fprintf (fd," precent : %.1f %%\n",(100.0*(double)m_high_cnt/(double)m_cnt));
@@ -198,7 +198,7 @@ void CTimeHistogram::Dump(FILE *fd){
for (j=0; j<HISTOGRAM_SIZE_LOG; j++) {
for (i=0; i<HISTOGRAM_SIZE; i++) {
if (m_hcnt[j][i] >0 ) {
- fprintf (fd," h[%lu] : %lu \n",(base*(i+1)),m_hcnt[j][i]);
+ fprintf (fd," h[%u] : %llu \n",(base*(i+1)),(unsigned long long)m_hcnt[j][i]);
}
}
base=base*10;
diff --git a/src/tuple_gen.h b/src/tuple_gen.h
index 29adbd69..d34e27bc 100755
--- a/src/tuple_gen.h
+++ b/src/tuple_gen.h
@@ -553,6 +553,9 @@ public:
class CServerPoolBase {
public:
+
+ virtual ~CServerPoolBase() {}
+
virtual void GenerateTuple(CTupleBase& tuple) = 0;
virtual uint16_t GenerateOnePort(uint32_t idx) = 0;
virtual void Delete() = 0;
diff --git a/src/utl_json.cpp b/src/utl_json.cpp
index 990346f5..fb55be0a 100755
--- a/src/utl_json.cpp
+++ b/src/utl_json.cpp
@@ -25,7 +25,7 @@ limitations under the License.
std::string add_json(std::string name, uint32_t counter,bool last){
char buff[200];
- sprintf(buff,"\"%s\":%lu",name.c_str(),counter);
+ sprintf(buff,"\"%s\":%lu",name.c_str(), (ulong)counter);
std::string s= std::string(buff);
if (!last) {
s+=",";
@@ -35,7 +35,7 @@ std::string add_json(std::string name, uint32_t counter,bool last){
std::string add_json(std::string name, uint64_t counter,bool last){
char buff[200];
- sprintf(buff,"\"%s\":%llu",name.c_str(),counter);
+ sprintf(buff,"\"%s\":%llu",name.c_str(), (unsigned long long)counter);
std::string s= std::string(buff);
if (!last) {
s+=",";
diff --git a/src/utl_yaml.cpp b/src/utl_yaml.cpp
index 5f3ca735..828817e4 100755
--- a/src/utl_yaml.cpp
+++ b/src/utl_yaml.cpp
@@ -104,6 +104,8 @@ bool utl_yaml_read_uint16(const YAML::Node& node,
val = (uint16_t)val_tmp;
res=true;
}
+
+ return (res);
}
bool utl_yaml_read_bool(const YAML::Node& node,