diff options
author | Hanoh Haim <hhaim@cisco.com> | 2015-10-26 16:49:19 +0200 |
---|---|---|
committer | Hanoh Haim <hhaim@cisco.com> | 2015-10-26 16:49:19 +0200 |
commit | 587f97686900f757b173469a2b231ede6705c568 (patch) | |
tree | 97f41581406211346f7e4c757e055c42bf620752 /src | |
parent | 44d266232d124bb277f6dec965a04dd93e47fb55 (diff) |
stateless dp works with static packet
Diffstat (limited to 'src')
-rwxr-xr-x | src/bp_gtest.cpp | 4 | ||||
-rwxr-xr-x | src/bp_sim.cpp | 176 | ||||
-rwxr-xr-x | src/bp_sim.h | 188 | ||||
-rwxr-xr-x | src/main_dpdk.cpp | 35 | ||||
-rwxr-xr-x | src/msg_manager.cpp | 13 | ||||
-rwxr-xr-x | src/msg_manager.h | 9 | ||||
-rwxr-xr-x | src/pal/linux/mbuf.h | 2 |
7 files changed, 337 insertions, 90 deletions
diff --git a/src/bp_gtest.cpp b/src/bp_gtest.cpp index d0bc27bc..556aa414 100755 --- a/src/bp_gtest.cpp +++ b/src/bp_gtest.cpp @@ -2031,6 +2031,10 @@ public: virtual int send_node(CGenNode * node); + virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m){ + return (0); + } + /** * flush all pending packets into the stream diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index 936f5f0c..d72da70c 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -3114,10 +3114,6 @@ int CNodeGenerator::close_file(CFlowGenListPerThread * thread){ return (0); } -int CNodeGenerator::flush_one_node_to_file(CGenNode * node){ - BP_ASSERT(m_v_if); - return (m_v_if->send_node(node)); -} int CNodeGenerator::update_stats(CGenNode * node){ if ( m_preview_mode.getVMode() >2 ){ @@ -3455,49 +3451,59 @@ int CNodeGenerator::flush_file(dsec_t max_time, uint8_t type=node->m_type; - if ( likely( type == CGenNode::FLOW_PKT ) ) { - /* PKT */ - if ( !(node->is_repeat_flow()) || (always==false)) { - flush_one_node_to_file(node); - #ifdef _DEBUG - update_stats(node); - #endif - } - m_p_queue.pop(); - if ( node->is_last_in_flow() ) { - if ((node->is_repeat_flow()) && (always==false)) { - /* Flow is repeated, reschedule it */ - thread->reschedule_flow( node); - }else{ - /* Flow will not be repeated, so free node */ - thread->free_last_flow_node( node); - } - }else{ - node->update_next_pkt_in_flow(); - m_p_queue.push(node); - } + if ( type == CGenNode::STATELESS_PKT ) { + + flush_one_node_to_file(node); + /* in case of continues */ + node->m_time += 0.0001; /*TBD PPS*/ + m_p_queue.push(node); + /* no need per thread stats, it is too heavy */ + }else{ - if ((type == CGenNode::FLOW_FIF)) { - /* callback to our method */ + if ( likely( type == CGenNode::FLOW_PKT ) ) { + /* PKT */ + if ( !(node->is_repeat_flow()) || (always==false)) { + flush_one_node_to_file(node); + #ifdef _DEBUG + update_stats(node); + #endif + } m_p_queue.pop(); - if ( always == false) { - thread->m_cur_time_sec = node->m_time ; - - if ( thread->generate_flows_roundrobin(&done) <0){ - break; + if ( node->is_last_in_flow() ) { + if ((node->is_repeat_flow()) && (always==false)) { + /* Flow is repeated, reschedule it */ + thread->reschedule_flow( node); + }else{ + /* Flow will not be repeated, so free node */ + thread->free_last_flow_node( node); } - if (!done) { - node->m_time +=d_time; - m_p_queue.push(node); + }else{ + node->update_next_pkt_in_flow(); + m_p_queue.push(node); + } + }else{ + if ((type == CGenNode::FLOW_FIF)) { + /* callback to our method */ + m_p_queue.pop(); + if ( always == false) { + thread->m_cur_time_sec = node->m_time ; + + if ( thread->generate_flows_roundrobin(&done) <0){ + break; + } + if (!done) { + node->m_time +=d_time; + m_p_queue.push(node); + }else{ + thread->free_node(node); + } }else{ thread->free_node(node); } + }else{ - thread->free_node(node); + handle_slow_messages(type,node,thread,always); } - - }else{ - handle_slow_messages(type,node,thread,always); } } } @@ -3848,8 +3854,100 @@ void CFlowGenListPerThread::check_msgs(void){ void delay(int msec); +const uint8_t test_udp_pkt[]={ + 0x00,0x00,0x00,0x01,0x00,0x00, + 0x00,0x00,0x00,0x01,0x00,0x00, + 0x08,0x00, + + 0x45,0x00,0x00,0x81, + 0xaf,0x7e,0x00,0x00, + 0x12,0x11,0xd9,0x23, + 0x01,0x01,0x01,0x01, + 0x3d,0xad,0x72,0x1b, + + 0x11,0x11, + 0x11,0x11, + + 0x00,0x6d, + 0x00,0x00, + + 0x64,0x31,0x3a,0x61, + 0x64,0x32,0x3a,0x69,0x64, + 0x32,0x30,0x3a,0xd0,0x0e, + 0xa1,0x4b,0x7b,0xbd,0xbd, + 0x16,0xc6,0xdb,0xc4,0xbb,0x43, + 0xf9,0x4b,0x51,0x68,0x33,0x72, + 0x20,0x39,0x3a,0x69,0x6e,0x66,0x6f, + 0x5f,0x68,0x61,0x73,0x68,0x32,0x30,0x3a,0xee,0xc6,0xa3, + 0xd3,0x13,0xa8,0x43,0x06,0x03,0xd8,0x9e,0x3f,0x67,0x6f, + 0xe7,0x0a,0xfd,0x18,0x13,0x8d,0x65,0x31,0x3a,0x71,0x39, + 0x3a,0x67,0x65,0x74,0x5f,0x70,0x65,0x65,0x72,0x73,0x31, + 0x3a,0x74,0x38,0x3a,0x3d,0xeb,0x0c,0xbf,0x0d,0x6a,0x0d, + 0xa5,0x31,0x3a,0x79,0x31,0x3a,0x71,0x65,0x87,0xa6,0x7d, + 0xe7 +}; + + + + +void CFlowGenListPerThread::start_stateless_const_rate_demo(){ + + CGenNodeStateless * node= create_node_sl(); + + /* add periodic */ + node->m_type = CGenNode::STATELESS_PKT; + node->m_time = m_cur_time_sec+ 0.0 /* STREAM ISG */; + node->m_flags =0; + + /* set socket id */ + node->set_socket_id(m_node_gen.m_socket_id); + + /* build a mbuf from a packet */ + uint16_t pkt_size=sizeof(test_udp_pkt); + uint8_t * stream_pkt=(uint8_t *)test_udp_pkt; + + /* allocate const mbuf */ + rte_mbuf_t * m = CGlobalInfo::pktmbuf_alloc( node->get_socket_id(),pkt_size ); + assert(m); + char *p = rte_pktmbuf_append(m, pkt_size); + assert(p); + /* copy the packet */ + memcpy(p,stream_pkt,pkt_size); + + /* set dir 0 or 1 client or server */ + pkt_dir_t dir=0; + node->set_mbuf_cache_dir(dir); + + /* TBD repace the mac if req we should add flag */ + m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir,m); + + /* set the packet as a readonly */ + node->set_cache_mbuf(m); + + #if 0 + /* dump the packet */ + uint8_t *p1=rte_pktmbuf_mtod(m, uint8_t*); + uint16_t pkt_size1=rte_pktmbuf_pkt_len(m); + utl_DumpBuffer(stdout,p,pkt_size,0); + #endif + + m_node_gen.add_node((CGenNode *)node); + + + double old_offset=0.0; + + CGenNode * node_sync= create_node() ; + node_sync->m_type = CGenNode::FLOW_SYNC; + node_sync->m_time = m_cur_time_sec + SYNC_TIME_OUT ; + m_node_gen.add_node(node_sync); + + // TBD time + m_node_gen.flush_file(100000000,0.0, false,this,old_offset); +} + void CFlowGenListPerThread::start_stateless_daemon(){ /* todo sleep */ + start_stateless_const_rate_demo(); while (1) { delay(100); diff --git a/src/bp_sim.h b/src/bp_sim.h index 328b6ba1..4dae7837 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -360,6 +360,16 @@ public: */ virtual int flush_tx_queue(void)=0; + + /** + * update the source and destination mac-addr of a given mbuf by global database + * + * @param dir + * @param m + * + * @return + */ + virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m)=0; public: @@ -1342,15 +1352,17 @@ class CCapFileFlowInfo ; /* this is a simple struct, do not add constructor and destractor here! we are optimizing the allocation dealocation !!! */ -struct CGenNode { + +struct CGenNodeBase { public: enum { - FLOW_PKT=0, - FLOW_FIF=1, - FLOW_DEFER_PORT_RELEASE=2, - FLOW_PKT_NAT=3, - FLOW_SYNC=4 /* called evey 1 msec */ + FLOW_PKT =0, + FLOW_FIF =1, + FLOW_DEFER_PORT_RELEASE =2, + FLOW_PKT_NAT =3, + FLOW_SYNC =4, /* called evey 1 msec */ + STATELESS_PKT =5 }; @@ -1358,7 +1370,7 @@ public: enum { NODE_FLAGS_DIR =1, NODE_FLAGS_MBUF_CACHE =2, - NODE_FLAGS_SAMPLE_RX_CHECK =4, + NODE_FLAGS_SAMPLE_RX_CHECK =4, NODE_FLAGS_LEARN_MODE =8, /* bits 3,4 MASK 0x18 wait for second direction packet */ NODE_FLAGS_LEARN_MSG_PROCESSED =0x10, /* got NAT msg */ @@ -1369,19 +1381,49 @@ public: NODE_FLAGS_INIT_START_FROM_SERVER_SIDE_SERVER_ADDR = 0x100 /* init packet start from server side with server addr */ }; + public: - /* C1 */ + /*********************************************/ + /* C1 must */ uint8_t m_type; uint8_t m_thread_id; /* zero base */ uint8_t m_socket_id; - uint8_t m_pad2; + uint8_t m_pad2; uint16_t m_src_port; uint16_t m_flags; /* BIT 0 - DIR , BIT 1 - mbug_cache BIT 2 - SAMPLE DUPLICATE */ - double m_time; + double m_time; /* can't change this header - size 16 bytes*/ + +public: + bool operator <(const CGenNodeBase * rsh ) const { + return (m_time<rsh->m_time); + } + bool operator ==(const CGenNodeBase * rsh ) const { + return (m_time==rsh->m_time); + } + bool operator >(const CGenNodeBase * rsh ) const { + return (m_time>rsh->m_time); + } + +public: + void set_socket_id(socket_id_t socket){ + m_socket_id=socket; + } + + socket_id_t get_socket_id(){ + return ( m_socket_id ); + } + + +}; + + +struct CGenNode : public CGenNodeBase { + +public: uint32_t m_src_ip; /* client ip */ uint32_t m_dest_ip; /* server ip */ @@ -1411,25 +1453,8 @@ public: uint32_t m_end_of_cache_line[6]; public: - bool operator <(const CGenNode * rsh ) const { - return (m_time<rsh->m_time); - } - bool operator ==(const CGenNode * rsh ) const { - return (m_time==rsh->m_time); - } - bool operator >(const CGenNode * rsh ) const { - return (m_time>rsh->m_time); - } -public: void Dump(FILE *fd); - void set_socket_id(socket_id_t socket){ - m_socket_id=socket; - } - - socket_id_t get_socket_id(){ - return ( m_socket_id ); - } static void DumpHeader(FILE *fd); @@ -1612,6 +1637,60 @@ public: } __rte_cache_aligned; +/* this is a event for stateless */ +struct CGenNodeStateless : public CGenNodeBase { +public: + + +private: + void * m_cache_mbuf; + + + uint64_t m_pad_end[13]; + + +public: + + void set_socket_id(socket_id_t socket){ + m_socket_id=socket; + } + + socket_id_t get_socket_id(){ + return ( m_socket_id ); + } + + inline void set_mbuf_cache_dir(pkt_dir_t dir){ + if (dir) { + m_flags |=NODE_FLAGS_DIR; + }else{ + m_flags &=~NODE_FLAGS_DIR; + } + } + + inline pkt_dir_t get_mbuf_cache_dir(){ + return ((pkt_dir_t)( m_flags &1)); + } + + + + inline void set_cache_mbuf(rte_mbuf_t * m){ + m_cache_mbuf=(void *)m; + m_flags |= NODE_FLAGS_MBUF_CACHE; + } + + inline rte_mbuf_t * get_cache_mbuf(){ + if ( m_flags &NODE_FLAGS_MBUF_CACHE ) { + return ((rte_mbuf_t *)m_cache_mbuf); + }else{ + return ((rte_mbuf_t *)0); + } + } + + +} __rte_cache_aligned; ; + + + #if __x86_64__ /* size of 64 bytes */ #define DEFER_CLIENTS_NUM (16) @@ -1658,19 +1737,30 @@ public: need to clean this up and derive this objects from base object but require too much refactoring right now hhaim */ + +#define COMPARE_NODE_OBJECT(NODE_NAME) if ( sizeof(NODE_NAME) != sizeof(CGenNode) ) { \ + printf("ERROR sizeof(%s) %lu != sizeof(CGenNode) %lu must be the same size \n",#NODE_NAME,sizeof(NODE_NAME),sizeof(CGenNode)); \ + assert(0); \ + }\ + if ( (int)offsetof(struct NODE_NAME,m_type)!=offsetof(struct CGenNodeBase,m_type) ){\ + printf("ERROR offsetof(struct %s,m_type)!=offsetof(struct CGenNodeBase,m_type) \n",#NODE_NAME);\ + assert(0);\ + }\ + if ( (int)offsetof(struct CGenNodeDeferPort,m_time)!=offsetof(struct CGenNodeBase,m_time) ){\ + printf("ERROR offsetof(struct %s,m_time)!=offsetof(struct CGenNodeBase,m_time) \n",#NODE_NAME);\ + assert(0);\ + } + +#define COMPARE_NODE_OBJECT_SIZE(NODE_NAME) if ( sizeof(NODE_NAME) != sizeof(CGenNode) ) { \ + printf("ERROR sizeof(%s) %lu != sizeof(CGenNode) %lu must be the same size \n",#NODE_NAME,sizeof(NODE_NAME),sizeof(CGenNode)); \ + assert(0); \ + } + + + inline int check_objects_sizes(void){ - if ( sizeof(CGenNodeDeferPort) != sizeof(CGenNode) ) { - printf("ERROR sizeof(CGenNodeDeferPort) %lu != sizeof(CGenNode) %lu must be the same size \n",sizeof(CGenNodeDeferPort),sizeof(CGenNode)); - assert(0); - } - if ( (int)offsetof(struct CGenNodeDeferPort,m_type)!=offsetof(struct CGenNode,m_type) ){ - printf("ERROR offsetof(struct CGenNodeDeferPort,m_type)!=offsetof(struct CGenNode,m_type) \n"); - assert(0); - } - if ( (int)offsetof(struct CGenNodeDeferPort,m_time)!=offsetof(struct CGenNode,m_time) ){ - printf("ERROR offsetof(struct CGenNodeDeferPort,m_time)!=offsetof(struct CGenNode,m_time) \n"); - assert(0); - } + COMPARE_NODE_OBJECT(CGenNodeDeferPort); + COMPARE_NODE_OBJECT_SIZE(CGenNodeStateless); return (0); } @@ -1729,6 +1819,11 @@ public: virtual int write_pkt(CCapPktRaw *pkt_raw); virtual int close_file(void); + virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m){ + return (0); + } + + /** * send one packet @@ -1790,6 +1885,10 @@ public: return (0); } + virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m){ + return (0); + } + virtual int send_node(CGenNode * node); @@ -1841,7 +1940,9 @@ public: private: - int flush_one_node_to_file(CGenNode * node); + inline int flush_one_node_to_file(CGenNode * node){ + return (m_v_if->send_node(node)); + } int update_stats(CGenNode * node); FORCE_NO_INLINE void handle_slow_messages(uint8_t type, CGenNode * node, @@ -3336,6 +3437,11 @@ public : inline CGenNode * create_node(void); + inline CGenNodeStateless * create_node_sl(void){ + return ((CGenNodeStateless*)create_node() ); + } + + inline void free_node(CGenNode *p); inline void free_last_flow_node(CGenNode *p); @@ -3344,6 +3450,8 @@ public: void Clean(); void start_generate_stateful(std::string erf_file_name,CPreviewMode &preview); void start_stateless_daemon(); + void start_stateless_const_rate_demo(); + void Dump(FILE *fd); void DumpCsv(FILE *fd); diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 6f031ae1..a64211cf 100755 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -1869,6 +1869,8 @@ public: bool process_rx_pkt(pkt_dir_t dir,rte_mbuf_t * m); + virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m); + public: void GetCoreCounters(CVirtualIFPerSideStats *stats); @@ -1881,7 +1883,7 @@ public: return ( CGlobalInfo::m_socket.port_to_socket( m_ports[0].m_port->get_port_id() ) ); } -private: +protected: int send_burst(CCorePerPort * lp_port, uint16_t len, @@ -1892,7 +1894,7 @@ private: -private: +protected: uint8_t m_core_id; uint16_t m_mbuf_cache; CCorePerPort m_ports[CS_NUM]; /* each core has 2 tx queues 1. client side and server side */ @@ -2007,6 +2009,7 @@ void CCoreEthIF::flush_rx_queue(void){ } } + int CCoreEthIF::flush_tx_queue(void){ /* flush both sides */ pkt_dir_t dir ; @@ -2163,15 +2166,22 @@ void CCoreEthIF::update_mac_addr(CGenNode * node,uint8_t *p){ -int CCoreEthIFStateless::send_node(CGenNode * node){ - /*CGenNode * node*/ - /* fill the info needed by stateless */ - printf(" send node stateless \n"); +int CCoreEthIFStateless::send_node(CGenNode * no){ + CGenNodeStateless * node_sl=(CGenNodeStateless *) no; + /* check that we have mbuf */ + rte_mbuf_t * m=node_sl->get_cache_mbuf(); + assert( m ); + pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir(); + CCorePerPort * lp_port=&m_ports[dir]; + CVirtualIFPerSideStats * lp_stats = &m_stats[dir]; + rte_pktmbuf_refcnt_update(m,1); + send_pkt(lp_port,m,lp_stats); return (0); }; + int CCoreEthIF::send_node(CGenNode * node){ if ( unlikely( node->get_cache_mbuf() !=NULL ) ) { @@ -2265,6 +2275,19 @@ int CCoreEthIF::send_node(CGenNode * node){ } +int CCoreEthIF::update_mac_addr_from_global_cfg(pkt_dir_t dir, + rte_mbuf_t *m){ + assert(m); + assert(dir<2); + CCorePerPort * lp_port=&m_ports[dir]; + uint8_t *p=rte_pktmbuf_mtod(m, uint8_t*); + uint8_t p_id=lp_port->m_port->get_port_id(); + + memcpy(p,CGlobalInfo::m_options.get_dst_src_mac_addr(p_id),12); + return (0); +} + + class CLatencyHWPort : public CPortLatencyHWBase { public: diff --git a/src/msg_manager.cpp b/src/msg_manager.cpp index 4db96583..9f41d08c 100755 --- a/src/msg_manager.cpp +++ b/src/msg_manager.cpp @@ -26,7 +26,7 @@ limitations under the License. /*TBD: need to fix socket_id for NUMA */ -bool CMessagingManager::Create(uint8_t num_dp_threads){ +bool CMessagingManager::Create(uint8_t num_dp_threads,std::string a_name){ m_num_dp_threads=num_dp_threads; assert(m_dp_to_cp==0); assert(m_cp_to_dp==0); @@ -38,11 +38,11 @@ bool CMessagingManager::Create(uint8_t num_dp_threads){ char name[100]; lp=getRingCpToDp(i); - sprintf(name,"cp_to_dp_%d",i); + sprintf(name,"%s_to_%d",(char *)a_name.c_str(),i); assert(lp->Create(std::string(name),1024,0)==true); lp=getRingDpToCp(i); - sprintf(name,"dp_to_cp_%d",i); + sprintf(name,"%s_from_%d",(char *)a_name.c_str(),i); assert(lp->Create(std::string(name),1024,0)==true); } @@ -89,7 +89,12 @@ CMsgIns * CMsgIns::Ins(void){ } bool CMsgIns::Create(uint8_t num_threads){ - return ( m_rx_dp.Create(num_threads) ); + + bool res = m_cp_dp.Create(num_threads,"cp_dp"); + if (!res) { + return (res); + } + return (m_rx_dp.Create(num_threads,"rx_dp")); } diff --git a/src/msg_manager.h b/src/msg_manager.h index b25660bb..6308eb1b 100755 --- a/src/msg_manager.h +++ b/src/msg_manager.h @@ -23,6 +23,7 @@ limitations under the License. #include "CRing.h" +#include <string> /* messages from CP->DP Ids */ @@ -71,7 +72,7 @@ public: m_dp_to_cp=0; m_num_dp_threads=0; } - bool Create(uint8_t num_dp_threads); + bool Create(uint8_t num_dp_threads,std::string name); void Delete(); CNodeRing * getRingCpToDp(uint8_t thread_id); CNodeRing * getRingDpToCp(uint8_t thread_id); @@ -94,12 +95,18 @@ public: CMessagingManager * getRxDp(){ return (&m_rx_dp); } + CMessagingManager * getCpDp(){ + return (&m_cp_dp); + } + uint8_t get_num_threads(){ return (m_rx_dp.get_num_threads()); } private: CMessagingManager m_rx_dp; + CMessagingManager m_cp_dp; + private: /* one instance */ diff --git a/src/pal/linux/mbuf.h b/src/pal/linux/mbuf.h index 693b095a..41274e64 100755 --- a/src/pal/linux/mbuf.h +++ b/src/pal/linux/mbuf.h @@ -185,7 +185,9 @@ static inline void utl_rte_pktmbuf_add_last(rte_mbuf_t *m,rte_mbuf_t *m_last){ #define __rte_cache_aligned + #define CACHE_LINE_SIZE 64 +#define RTE_CACHE_LINE_SIZE 64 #define SOCKET_ID_ANY 0 |