diff options
Diffstat (limited to 'src')
33 files changed, 1061 insertions, 456 deletions
diff --git a/src/bp_sim.h b/src/bp_sim.h index 4b1a88e3..cd85e82b 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -41,7 +41,7 @@ limitations under the License. #include <common/Network/Packet/IPv6Header.h> #include <common/Network/Packet/EthernetHeader.h> #include <math.h> -#include <common/bitMan.h> +#include <common/bitMan.h> #include <yaml-cpp/yaml.h> #include "trex_defs.h" #include "os_time.h" @@ -97,7 +97,7 @@ public: MIN_VM_V6=1 // IPv6 addressing }; uint8_t m_cmd; - uint8_t m_flags; + uint8_t m_flags; uint16_t m_start_0; uint16_t m_stop_1; uint16_t m_add_pkt_len; /* request more length for mbuf packet the size */ @@ -116,16 +116,16 @@ public: uint16_t m_server_port; }; -/* this command replace IP in 2 diffrent location and port +/* this command replace IP in 2 diffrent location and port -c = 10.1.1.2 -o = 10.1.1.2 +c = 10.1.1.2 +o = 10.1.1.2 m = audio 102000 ==> -c = xx.xx.xx.xx -o = xx.xx.xx.xx +c = xx.xx.xx.xx +o = xx.xx.xx.xx m = audio yyyy */ @@ -248,7 +248,7 @@ class CFlowGenListPerThread ; /* callback */ void on_node_first(uint8_t plugin_id,CGenNode * node, - CFlowYamlInfo * template_info, + CFlowYamlInfo * template_info, CTupleTemplateGeneratorSmart * tuple_gen, CFlowGenListPerThread * flow_gen ); @@ -259,7 +259,7 @@ rte_mbuf_t * on_node_generate_mbuf(uint8_t plugin_id,CGenNode * node,CFlowPk class CPreviewMode ; struct CGenNode; -/* represent the virtual interface +/* represent the virtual interface */ /* counters per side */ @@ -276,7 +276,7 @@ public: uint64_t m_tx_drop; uint64_t m_tx_queue_full; uint64_t m_tx_alloc_error; - tx_per_flow_t m_tx_per_flow[MAX_FLOW_STATS]; + tx_per_flow_t m_tx_per_flow[MAX_FLOW_STATS]; CPerTxthreadTemplateInfo m_template; public: @@ -309,10 +309,10 @@ public: void CVirtualIFPerSideStats::Dump(FILE *fd){ #define DP_B(f) if (f) printf(" %-40s : %lu \n",#f,f) - DP_B(m_tx_pkt); + DP_B(m_tx_pkt); DP_B(m_tx_rx_check_pkt); - DP_B(m_tx_bytes); - DP_B(m_tx_drop); + DP_B(m_tx_bytes); + DP_B(m_tx_drop); DP_B(m_tx_alloc_error); DP_B(m_tx_queue_full); m_template.Dump(fd); @@ -342,17 +342,17 @@ public: /** * send one packet - * + * * @param node - * - * @return + * + * @return */ virtual int send_node(CGenNode * node) =0; /** * send one packet to a specific dir. flush all packets - * + * * @param dir * @param m */ @@ -361,26 +361,29 @@ public: /** - * flush all pending packets into the stream - * - * @return + * flush all pending packets into the stream + * + * @return */ virtual int flush_tx_queue(void)=0; - + // read all packets from rx_queue on dp core + virtual void flush_dp_rx_queue(void) {}; + // read all packets from rx queue + virtual void flush_rx_queue(void) {}; /** * update the source and destination mac-addr of a given mbuf by global database - * + * * @param dir * @param m - * - * @return + * + * @return */ virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, uint8_t * p)=0; /** * translate a port_id to the correct dir on the core - * + * */ virtual pkt_dir_t port_id_to_dir(uint8_t port_id) { return (CS_INVALID); @@ -602,13 +605,13 @@ public: } } - bool get_is_rx_check_enable(){ - return (btGetMaskBit32(m_flags,31,31) ? true:false); - } + bool get_is_rx_check_enable(){ + return (btGetMaskBit32(m_flags,31,31) ? true:false); + } - void set_rx_check_enable(bool enable){ - btSetMaskBit32(m_flags,31,31,enable?1:0); - } + void set_rx_check_enable(bool enable){ + btSetMaskBit32(m_flags,31,31,enable?1:0); + } bool get_mac_ip_features_enable(){ @@ -693,7 +696,7 @@ public: u.m_mac.dest[3]=1; u.m_mac.src[3]=1; } - union { + union { mac_align_t m_mac; uint8_t m_data[16]; } u; @@ -717,10 +720,10 @@ public: }; enum trex_learn_mode_e { - LEARN_MODE_DISABLED=0, - LEARN_MODE_TCP_ACK=1, - LEARN_MODE_IP_OPTION=2, - LEARN_MODE_MAX=LEARN_MODE_IP_OPTION + LEARN_MODE_DISABLED=0, + LEARN_MODE_TCP_ACK=1, + LEARN_MODE_IP_OPTION=2, + LEARN_MODE_MAX=LEARN_MODE_IP_OPTION }; public: @@ -736,7 +739,7 @@ public: m_expected_portd = 4; /* should be at least the number of ports found in the system but could be less */ m_vlan_port[0]=100; m_vlan_port[1]=100; - m_rx_check_sample=0; + m_rx_check_sample=0; m_rx_check_hops = 0; m_io_mode=1; m_run_flags=0; @@ -759,12 +762,12 @@ public: uint32_t m_latency_rate; /* pkt/sec for each thread/port zero disable */ uint32_t m_latency_mask; uint32_t m_latency_prev; - uint16_t m_rx_check_sample; /* the sample rate of flows */ + uint16_t m_rx_check_sample; /* the sample rate of flows */ uint16_t m_rx_check_hops; uint16_t m_zmq_port; uint16_t m_telnet_port; uint16_t m_expected_portd; - uint16_t m_io_mode; //0,1,2 0 disable, 1- normal , 2 - short + uint16_t m_io_mode; //0,1,2 0 disable, 1- normal , 2 - short uint16_t m_run_flags; uint8_t m_mac_splitter; uint8_t m_l_pkt_mode; @@ -782,7 +785,7 @@ public: std::string out_file; std::string prefix; - + CMacAddrCfg m_mac_addr[TREX_MAX_PORTS]; uint8_t * get_src_mac_addr(int if_index){ @@ -861,7 +864,7 @@ public: void Dump(FILE *fd); public: - uint32_t m_mbuf[MBUF_SIZE]; // relative to traffic norm to 2x10G ports + uint32_t m_mbuf[MBUF_SIZE]; // relative to traffic norm to 2x10G ports uint32_t m_num_cores; }; @@ -869,28 +872,28 @@ public: typedef uint8_t socket_id_t; typedef uint8_t port_id_t; /* the real phsical thread id */ -typedef uint8_t physical_thread_id_t; +typedef uint8_t physical_thread_id_t; -typedef uint8_t virtual_thread_id_t; -/* +typedef uint8_t virtual_thread_id_t; +/* + + virtual thread 0 (v0)- is always the master - virtual thread 0 (v0)- is always the master - -for 2 dual ports ( 2x2 =4 ports) the virtual thread looks like that +for 2 dual ports ( 2x2 =4 ports) the virtual thread looks like that ----------------- DEFAULT: ----------------- (0,1) (2,3) dual-if0 dual-if-1 v1 v2 - v3 v4 + v3 v4 v5 v6 - v7 v8 - - rx is v9 + v7 v8 + + rx is v9 - */ + */ #define MAX_SOCKETS_SUPPORTED (4) #define MAX_THREADS_SUPPORTED (120) @@ -904,12 +907,12 @@ public: /* is socket enabled */ virtual bool is_sockets_enable(socket_id_t socket)=0; - + /* number of main active sockets. socket #0 is always used */ virtual socket_id_t max_num_active_sockets()=0; virtual ~CPlatformSocketInfoBase() {} - + public: /* which socket to allocate memory to each port */ virtual socket_id_t port_to_socket(port_id_t port)=0; @@ -949,7 +952,7 @@ public: /* is socket enabled */ bool is_sockets_enable(socket_id_t socket); - + /* number of main active sockets. socket #0 is always used */ socket_id_t max_num_active_sockets(); @@ -995,7 +998,7 @@ public: /* is socket enabled */ bool is_sockets_enable(socket_id_t socket); - + /* number of main active sockets. socket #0 is always used */ socket_id_t max_num_active_sockets(); @@ -1033,7 +1036,7 @@ private: bool m_sockets_enable[MAX_SOCKETS_SUPPORTED]; uint32_t m_sockets_enabled; socket_id_t m_socket_per_dual_if[(TREX_MAX_PORTS >> 1)]; - + uint32_t m_max_threads_per_dual_if; uint32_t m_num_dual_if; @@ -1058,7 +1061,7 @@ public: /* is socket enabled */ bool is_sockets_enable(socket_id_t socket); - + /* number of main active sockets. socket #0 is always used */ socket_id_t max_num_active_sockets(); @@ -1141,15 +1144,15 @@ public: public: rte_mempool_t * m_small_mbuf_pool; /* pool for start packets */ - rte_mempool_t * m_mbuf_pool_128; - rte_mempool_t * m_mbuf_pool_256; - rte_mempool_t * m_mbuf_pool_512; - rte_mempool_t * m_mbuf_pool_1024; - rte_mempool_t * m_mbuf_pool_2048; - rte_mempool_t * m_mbuf_pool_4096; - rte_mempool_t * m_mbuf_pool_9k; + rte_mempool_t * m_mbuf_pool_128; + rte_mempool_t * m_mbuf_pool_256; + rte_mempool_t * m_mbuf_pool_512; + rte_mempool_t * m_mbuf_pool_1024; + rte_mempool_t * m_mbuf_pool_2048; + rte_mempool_t * m_mbuf_pool_4096; + rte_mempool_t * m_mbuf_pool_9k; - rte_mempool_t * m_mbuf_global_nodes; + rte_mempool_t * m_mbuf_global_nodes; uint32_t m_pool_id; }; @@ -1167,16 +1170,16 @@ public: return ( m_mem_pool[socket].pktmbuf_alloc_small() ); } - + /** - * try to allocate small buffers too - * _alloc allocate big buffers only - * + * try to allocate small buffers too + * _alloc allocate big buffers only + * * @param socket * @param size - * - * @return + * + * @return */ static inline rte_mbuf_t * pktmbuf_alloc(socket_id_t socket,uint16_t size){ if (size<FIRST_PKT_SIZE) { @@ -1232,7 +1235,7 @@ public: public: static CRteMemPool m_mem_pool[MAX_SOCKETS_SUPPORTED]; - static uint32_t m_nodes_pool_size; + static uint32_t m_nodes_pool_size; static CParserOption m_options; static CGlobalMemory m_memory_cfg; static CPlatformSocketInfo m_socket; @@ -1320,19 +1323,19 @@ struct CFlowYamlInfo { m_server_pool_idx = 0; m_cap_mode=false; } - + std::string m_name; std::string m_client_pool_name; std::string m_server_pool_name; - double m_k_cps; //k CPS - double m_restart_time; /* restart time of this template */ - dsec_t m_ipg_sec; // ipg in sec + double m_k_cps; //k CPS + double m_restart_time; /* restart time of this template */ + dsec_t m_ipg_sec; // ipg in sec dsec_t m_rtt_sec; // rtt in sec - uint32_t m_w; + uint32_t m_w; uint32_t m_wlength; uint32_t m_limit; uint32_t m_flowcnt; - uint8_t m_plugin_id; /* 0 - default , 1 - RTSP160 , 2- RTSP250 */ + uint8_t m_plugin_id; /* 0 - default , 1 - RTSP160 , 2- RTSP250 */ uint8_t m_client_pool_idx; uint8_t m_server_pool_idx; bool m_one_app_server; @@ -1418,7 +1421,7 @@ public: NODE_FLAGS_LEARN_MSG_PROCESSED =0x10, /* got NAT msg */ NODE_FLAGS_LATENCY =0x20, /* got NAT msg */ - NODE_FLAGS_INIT_START_FROM_SERVER_SIDE = 0x40, + NODE_FLAGS_INIT_START_FROM_SERVER_SIDE = 0x40, NODE_FLAGS_ALL_FLOW_SAME_PORT_SIDE = 0x80, NODE_FLAGS_INIT_START_FROM_SERVER_SIDE_SERVER_ADDR = 0x100 /* init packet start from server side with server addr */ }; @@ -1434,8 +1437,8 @@ public: uint16_t m_src_port; uint16_t m_flags; /* BIT 0 - DIR , - BIT 1 - mbug_cache - BIT 2 - SAMPLE DUPLICATE */ + BIT 1 - mbug_cache + BIT 2 - SAMPLE DUPLICATE */ double m_time; /* can't change this header - size 16 bytes*/ @@ -1512,7 +1515,7 @@ public: /* is it possible to cache MBUF */ inline void update_next_pkt_in_flow(void); - inline void reset_pkt_in_flow(void); + inline void reset_pkt_in_flow(void); inline uint8_t get_plugin_id(void){ return ( m_template_info->m_plugin_id); } @@ -1567,8 +1570,8 @@ public: /* direction for TCP/UDP port */ inline pkt_dir_t cur_pkt_port_addr_dir(); /* from which interface dir to get out */ - inline pkt_dir_t cur_interface_dir(); - + inline pkt_dir_t cur_interface_dir(); + inline void set_mbuf_cache_dir(pkt_dir_t dir){ if (dir) { @@ -1597,19 +1600,19 @@ public: public: - inline void set_rx_check(){ - m_flags |= NODE_FLAGS_SAMPLE_RX_CHECK; - } + inline void set_rx_check(){ + m_flags |= NODE_FLAGS_SAMPLE_RX_CHECK; + } - inline bool is_rx_check_enabled(){ - return ((m_flags & NODE_FLAGS_SAMPLE_RX_CHECK)?true:false); - } + inline bool is_rx_check_enabled(){ + return ((m_flags & NODE_FLAGS_SAMPLE_RX_CHECK)?true:false); + } public: inline void set_nat_first_state(){ btSetMaskBit16(m_flags,4,3,1); - m_type=FLOW_PKT_NAT; + m_type=FLOW_PKT_NAT; } inline bool is_nat_first_state(){ @@ -1665,8 +1668,8 @@ public: bool is_external_is_eq_to_internal_ip(){ /* this API is used to check TRex itself */ - if ( (get_nat_ipv4_addr() == m_src_ip ) && - (get_nat_ipv4_port()==m_src_port) && + if ( (get_nat_ipv4_addr() == m_src_ip ) && + (get_nat_ipv4_port()==m_src_port) && ( get_nat_ipv4_addr_server() == m_dest_ip) ) { return (true); }else{ @@ -1704,7 +1707,7 @@ struct CGenNodeDeferPort { uint16_t m_ports[DEFER_CLIENTS_NUM]; uint8_t m_pool_idx[DEFER_CLIENTS_NUM]; public: - void init(void){ + void init(void){ m_type=CGenNode::FLOW_DEFER_PORT_RELEASE; m_cnt=0; } @@ -1724,7 +1727,7 @@ public: } __rte_cache_aligned ; -/* run time verification of objects size and offsets +/* run time verification of objects size and offsets need to clean this up and derive this objects from base object but require too much refactoring right now hhaim */ @@ -1817,19 +1820,19 @@ public: /** * send one packet - * + * * @param node - * - * @return + * + * @return */ virtual int send_node(CGenNode * node); /** - * flush all pending packets into the stream - * - * @return + * flush all pending packets into the stream + * + * @return */ virtual int flush_tx_queue(void); @@ -1858,7 +1861,7 @@ public: /** * same as regular STL but no I/O (dry run) - * + * * @author imarom (07-Jan-16) */ class CErfIFStlNull : public CErfIFStl { @@ -1959,7 +1962,7 @@ public: int open_file(std::string file_name, CPreviewMode * preview); int close_file(CFlowGenListPerThread * thread); - int flush_file(dsec_t max_time, + int flush_file(dsec_t max_time, dsec_t d_time, bool always, CFlowGenListPerThread * thread, @@ -2012,7 +2015,7 @@ public: CPreviewMode m_preview_mode; uint64_t m_cnt; uint64_t m_limit; - CTimeHistogram m_realtime_his; + CTimeHistogram m_realtime_his; }; @@ -2128,7 +2131,7 @@ inline bool CFlowKey::operator ==(const CFlowKey& rhs) const{ #define IS_PCAP_TIMING 7 -// 8-12 is used +// 8-12 is used #define FLOW_ID 8 @@ -2164,9 +2167,9 @@ public: } private: - // per direction info + // per direction info uint16_t m_dir_pkt_num; // pkt id - uint16_t m_max_dir_flow_pkts; + uint16_t m_max_dir_flow_pkts; }; @@ -2219,7 +2222,7 @@ public: } /** * start from zero 0,1,2,.. , it is on global flow if you have couple of flows it will count all of the flows - * + * * flow FlowPktNum * 0 0 * 0 1 @@ -2227,8 +2230,8 @@ public: * 1 0 * 1 1 * 2 0 - * - * @return + * + * @return */ inline uint32_t getFlowPktNum(){ return ( m_flow_pkt_num); @@ -2252,7 +2255,7 @@ public: } - /* return true if this packet in diff direction from prev flow packet , + /* return true if this packet in diff direction from prev flow packet , if true need to choose RTT else IPG for inter packet gap */ inline bool IsRtt(){ return (btGetMaskBit32(m_flags,IS_RTT,IS_RTT) ? true:false); @@ -2313,7 +2316,7 @@ public: inline void SetId(uint16_t _id){ btSetMaskBit32(m_flags,31,16,_id); - + } inline uint16_t getId(){ return ( ( uint16_t)btGetMaskBit32(m_flags,31,16)); @@ -2328,7 +2331,7 @@ public: return (btGetMaskBit32(m_flags,IS_LAST_PKT_S,IS_LAST_PKT_E) ? true:false); } - // there could be couple of flows per template in case of plugin + // there could be couple of flows per template in case of plugin inline void SetMaxPktsPerFlow(uint32_t pkts){ assert(pkts<65000); m_max_flow_pkts=pkts; @@ -2336,7 +2339,7 @@ public: inline uint16_t GetMaxPktsPerFlow(){ return ( m_max_flow_pkts ); } - // there could be couple of flows per template in case of plugin + // there could be couple of flows per template in case of plugin inline void SetMaxFlowTimeout(double sec){ //assert (sec<65000); sec = sec*2.0+5.0; @@ -2369,12 +2372,12 @@ public: private: uint32_t m_flags; - uint16_t m_flow_pkt_num; // packet number inside the flow - uint8_t m_plugin_id; // packet number inside the flow + uint16_t m_flow_pkt_num; // packet number inside the flow + uint8_t m_plugin_id; // packet number inside the flow uint8_t m_pad; uint16_t m_max_flow_pkts; // how many packet per this flow getFlowId() - uint16_t m_max_flow_aging; // maximum aging in sec - CPacketDescriptorPerDir m_per_dir[CS_NUM]; // per direction info + uint16_t m_max_flow_aging; // maximum aging in sec + CPacketDescriptorPerDir m_per_dir[CS_NUM]; // per direction info }; @@ -2427,7 +2430,7 @@ public: class CPacketIndication { public: - dsec_t m_cap_ipg; /* ipg from cap file */ + dsec_t m_cap_ipg; /* ipg from cap file */ CCapPktRaw * m_packet; CFlow * m_flow; @@ -2437,10 +2440,10 @@ public: IPv6Header * m_ipv6; } l3; bool m_is_ipv6; - union { + union { TCPHeader * m_tcp; UDPHeader * m_udp; - ICMPHeader * m_icmp; + ICMPHeader * m_icmp; } l4; uint8_t * m_payload; uint16_t m_payload_len; @@ -2489,10 +2492,10 @@ public: /** - * return the application ipv4/ipv6 option offset + * return the application ipv4/ipv6 option offset * if learn bit is ON , it is always the first options ( IPV6/IPV4) - * - * @return + * + * @return */ uint32_t getIpAppOptionOffset(){ if ( is_ipv6() ) { @@ -2585,7 +2588,7 @@ class CPacketParser { public: bool Create(); void Delete(); - bool ProcessPacket(CPacketIndication * pkt_indication, + bool ProcessPacket(CPacketIndication * pkt_indication, CCapPktRaw * raw_packet); public: CCPacketParserCounters m_counter; @@ -2706,12 +2709,12 @@ public: inline rte_mbuf_t * do_generate_new_mbuf_ex(CGenNode * node,CFlowInfo * flow_info); inline rte_mbuf_t * do_generate_new_mbuf_ex_big(CGenNode * node,CFlowInfo * flow_info); inline rte_mbuf_t * do_generate_new_mbuf_ex_vm(CGenNode * node, - CFlowInfo * flow_info, int16_t * s_size); + CFlowInfo * flow_info, int16_t * s_size); public: - /* push the number of bytes into the packets and make more room - should be used by NAT feature that should have ipv4 option in the first packet - this function should not be called in runtime, only when template is loaded due to it heavey cost of operation ( malloc/free memory ) + /* push the number of bytes into the packets and make more room + should be used by NAT feature that should have ipv4 option in the first packet + this function should not be called in runtime, only when template is loaded due to it heavey cost of operation ( malloc/free memory ) */ char * push_ipv4_option_offline(uint8_t bytes); char * push_ipv6_option_offline(uint8_t bytes); @@ -2720,10 +2723,10 @@ public: /** * mark this packet as learn packet - * should + * should * 1. push ipv4 option ( 8 bytes) - * 2. mask the packet as learn - * 3. update the option pointer + * 2. mask the packet as learn + * 3. update the option pointer */ void mask_as_learn(); @@ -2750,7 +2753,7 @@ private: public: CPacketIndication m_pkt_indication; - CCapPktRaw * m_packet; + CCapPktRaw * m_packet; rte_mbuf_t * m_big_mbuf[MAX_SOCKETS_SUPPORTED]; /* allocate big mbug per socket */ }; @@ -2764,10 +2767,10 @@ inline void CFlowPktInfo::update_pkt_info2(char *p, int update_len , CGenNode * node ){ - IPHeader * ipv4= + IPHeader * ipv4= (IPHeader *)(p + m_pkt_indication.getFastIpOffsetFast()); - EthernetHeader * et = + EthernetHeader * et = (EthernetHeader * )(p + m_pkt_indication.getFastEtherOffset()); (void)et; @@ -2820,7 +2823,7 @@ inline void CFlowPktInfo::update_pkt_info2(char *p, m_tcp->setSourcePort(flow_info->server_port); } } - + }else { if ( m_pkt_indication.m_desc.IsUdp() ){ UDPHeader * m_udp =(UDPHeader *)(p +m_pkt_indication.getFastTcpOffset() ); @@ -2848,10 +2851,10 @@ inline void CFlowPktInfo::update_pkt_info2(char *p, inline void CFlowPktInfo::update_pkt_info(char *p, CGenNode * node){ - IPHeader * ipv4= + IPHeader * ipv4= (IPHeader *)(p + m_pkt_indication.getFastIpOffsetFast()); - EthernetHeader * et = + EthernetHeader * et = (EthernetHeader * )(p + m_pkt_indication.getFastEtherOffset()); (void)et; @@ -2863,7 +2866,7 @@ inline void CFlowPktInfo::update_pkt_info(char *p, if ( unlikely (m_pkt_indication.is_ipv6())) { - + // Update the IPv6 address IPv6Header *ipv6= (IPv6Header *)ipv4; @@ -2886,22 +2889,22 @@ inline void CFlowPktInfo::update_pkt_info(char *p, ipv4->setTimeToLive(TTL_RESERVE_DUPLICATE); /* first ipv4 option add the info in case of learn packet, usualy only the first packet */ - if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_IP_OPTION)) { - CNatOption *lpNat =(CNatOption *)ipv4->getOption(); - lpNat->set_fid(node->get_short_fid()); - lpNat->set_thread_id(node->get_thread_id()); - } else { - // This method only work on first TCP SYN - if (ipv4->getProtocol() == IPPROTO_TCP) { - TCPHeader *tcp = (TCPHeader *)(((uint8_t *)ipv4) + ipv4->getHeaderLength()); - if (tcp->getSynFlag()) { - tcp->setAckNumber(CNatRxManager::calc_tcp_ack_val(node->get_short_fid(), node->get_thread_id())); - } + if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_IP_OPTION)) { + CNatOption *lpNat =(CNatOption *)ipv4->getOption(); + lpNat->set_fid(node->get_short_fid()); + lpNat->set_thread_id(node->get_thread_id()); + } else { + // This method only work on first TCP SYN + if (ipv4->getProtocol() == IPPROTO_TCP) { + TCPHeader *tcp = (TCPHeader *)(((uint8_t *)ipv4) + ipv4->getHeaderLength()); + if (tcp->getSynFlag()) { + tcp->setAckNumber(CNatRxManager::calc_tcp_ack_val(node->get_short_fid(), node->get_thread_id())); + } #ifdef NAT_TRACE_ - printf(" %.3f : flow_id: %x thread_id %x TCP ack %x\n",now_sec(), node->get_short_fid(), node->get_thread_id(), tcp->getAckNumber()); + printf(" %.3f : flow_id: %x thread_id %x TCP ack %x\n",now_sec(), node->get_short_fid(), node->get_thread_id(), tcp->getAckNumber()); #endif - } - } + } + } } /* in all cases update the ip using the outside ip */ @@ -3005,7 +3008,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex(CGenNode * node, BP_ASSERT ( (((uintptr_t)m_packet->raw) & 0x7f )== 0) ; - memcpy(p,m_packet->raw,len); + memcpy(p,m_packet->raw,len); update_pkt_info2(p,flow_info,0,node); @@ -3014,7 +3017,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex(CGenNode * node, return(m); } - + inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex_big(CGenNode * node, CFlowInfo * flow_info){ rte_mbuf_t * m; @@ -3029,7 +3032,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex_big(CGenNode * node, BP_ASSERT ( (((uintptr_t)m_packet->raw) & 0x7f )== 0) ; - memcpy(p,m_packet->raw,len); + memcpy(p,m_packet->raw,len); update_pkt_info2(p,flow_info,0,node); @@ -3055,7 +3058,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex_vm(CGenNode * node, /* alloc big buffer to update it*/ m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), len); - assert(m); + assert(m); /* append the additional bytes requested and update later */ char *p=rte_pktmbuf_append(m, len); @@ -3063,7 +3066,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex_vm(CGenNode * node, BP_ASSERT ( (((uintptr_t)m_packet->raw) & 0x7f )== 0) ; /* copy the headers until the payload */ - memcpy(p, m_packet->raw, m_pkt_indication.getPayloadOffset() ); + memcpy(p, m_packet->raw, m_pkt_indication.getPayloadOffset() ); CMiniVM vm; vm.m_pkt_info = this; vm.m_pyload_mbuf_ptr = p+m_pkt_indication.getPayloadOffset(); @@ -3077,7 +3080,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex_vm(CGenNode * node, /* update IP length , and TCP checksum , we can accelerate this using hardware ! */ uint16_t pkt_adjust = vm.m_new_pkt_size - m_packet->pkt_len; update_pkt_info2(p,flow_info,pkt_adjust,node); - + /* return change in packet size due to packet tranforms */ *s_size = vm.m_new_pkt_size - m_packet->pkt_len; @@ -3110,7 +3113,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf(CGenNode * node){ BP_ASSERT ( (((uintptr_t)m_packet->raw) & 0x7f )== 0) ; - memcpy(p,m_packet->raw,len); + memcpy(p,m_packet->raw,len); update_pkt_info(p,node); @@ -3119,7 +3122,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf(CGenNode * node){ return m; } - + inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_big(CGenNode * node){ rte_mbuf_t * m; uint16_t len = m_packet->pkt_len; @@ -3133,7 +3136,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_big(CGenNode * node){ BP_ASSERT ( (((uintptr_t)m_packet->raw) & 0x7f )== 0) ; - memcpy(p,m_packet->raw,len); + memcpy(p,m_packet->raw,len); update_pkt_info(p,node); @@ -3206,15 +3209,15 @@ public: class CCapFileFlowInfo { public: enum load_cap_file_err { - kOK = 0, - kFileNotExist, - kNegTimestamp, - kNoSyn, - kTCPOffsetTooBig, - kNoTCPFromServer, - kPktNotSupp, - kPktProcessFail, - kCapFileErr + kOK = 0, + kFileNotExist, + kNegTimestamp, + kNoSyn, + kTCPOffsetTooBig, + kNoTCPFromServer, + kPktNotSupp, + kPktProcessFail, + kCapFileErr }; bool Create(); @@ -3253,7 +3256,7 @@ public: return (m_total_errors); } - // return the cap file length in sec + // return the cap file length in sec double get_cap_file_length_sec(); void get_total_memory(CCCapFileMemoryUsage & memory); @@ -3287,8 +3290,8 @@ public: // IPv4 addressing // IPv6 addressing - std::vector <uint16_t> m_src_ipv6; - std::vector <uint16_t> m_dst_ipv6; + std::vector <uint16_t> m_src_ipv6; + std::vector <uint16_t> m_dst_ipv6; bool m_ipv6_set; // new section @@ -3342,7 +3345,7 @@ public: double duration_sec; double m_cps; double m_mb_sec; - double m_mB_sec; + double m_mB_sec; double m_c_flows; double m_pps ; double m_total_Mbytes ; @@ -3367,7 +3370,7 @@ public: class CFlowGeneratorRecPerThread { public: - bool Create(CTupleGeneratorSmart * global_gen, + bool Create(CTupleGeneratorSmart * global_gen, CFlowYamlInfo * info, CFlowsYamlInfo * yaml_flow_info, CCapFileFlowInfo * flow_info, @@ -3388,11 +3391,11 @@ public: CCapFileFlowInfo * m_flow_info; CFlowYamlInfo * m_info; CFlowsYamlInfo * m_flows_info; - CPolicer m_policer; + CPolicer m_policer; uint16_t m_id ; uint32_t m_thread_id; bool m_tuple_gen_was_set; -} __rte_cache_aligned; +} __rte_cache_aligned; @@ -3405,16 +3408,16 @@ public: uint16_t _id); void Delete(); public: - + void Dump(FILE *fd); void getFlowStats(CFlowStats * stats); public: CCapFileFlowInfo m_flow_info; CFlowYamlInfo * m_info; CFlowsYamlInfo * m_flows_info; - CPolicer m_policer; + CPolicer m_policer; uint16_t m_id; -private: +private: void fixup_ipg_if_needed(); }; @@ -3423,7 +3426,7 @@ public: CPPSMeasure(){ reset(); } - //reset + //reset void reset(void){ m_start=false; m_last_time_msec=0; @@ -3453,7 +3456,7 @@ public: class CBwMeasure { public: CBwMeasure(); - //reset + //reset void reset(void); //add packet size double add(uint64_t size); @@ -3498,7 +3501,7 @@ public: friend class CNodeGenerator; friend class CPluginCallbackSimple; friend class CCapFileFlowInfo; - + typedef CGenericMap<flow_id_t,CGenNode> flow_id_node_t; bool Create(uint32_t thread_id, @@ -3518,23 +3521,23 @@ public: m_node_gen.set_vif(v_if); } - /* return the dual port ID this thread is attached to in 4 ports configuration - there are 2 dual-ports + /* return the dual port ID this thread is attached to in 4 ports configuration + there are 2 dual-ports thread 0 - dual 0 thread 1 - dual 1 thread 2 - dual 0 thread 3 - dual 1 - - */ + + */ uint32_t getDualPortId(); public : double get_total_kcps(); double get_total_kcps(uint8_t pool_idx, bool is_client); double get_delta_flow_is_sec(); - double get_longest_flow(); - double get_longest_flow(uint8_t pool_idx, bool is_client); + double get_longest_flow(); + double get_longest_flow(uint8_t pool_idx, bool is_client); void inc_current_template(void); int generate_flows_roundrobin(bool *done); int reschedule_flow(CGenNode *node); @@ -3627,9 +3630,9 @@ public: CFlowGenList * m_flow_list; rte_mempool_t * m_node_pool; - std::vector<CFlowGeneratorRecPerThread *> m_cap_gen; + std::vector<CFlowGeneratorRecPerThread *> m_cap_gen; - CFlowsYamlInfo m_yaml_info; + CFlowsYamlInfo m_yaml_info; CTupleGeneratorSmart m_smart_gen; @@ -3644,7 +3647,7 @@ public: double m_stop_time_sec; CPreviewMode m_preview_mode; -public: +public: CFlowGenStats m_stats; CBwMeasure m_mb_sec; CCpuUtlDp m_cpu_dp_u; @@ -3663,7 +3666,7 @@ private: bool m_terminated_by_master; private: - uint8_t m_cacheline_pad[RTE_CACHE_LINE_SIZE][19]; // improve prefech + uint8_t m_cacheline_pad[RTE_CACHE_LINE_SIZE][19]; // improve prefech } __rte_cache_aligned ; inline CGenNode * CFlowGenListPerThread::create_node(void){ @@ -3726,7 +3729,7 @@ public: public: std::vector<CFlowGeneratorRec *> m_cap_gen; /* global info */ CFlowsYamlInfo m_yaml_info; /* global yaml*/ - std::vector<CFlowGenListPerThread *> m_threads_info; + std::vector<CFlowGenListPerThread *> m_threads_info; CFlowGenListMac m_mac_info; }; @@ -3761,19 +3764,19 @@ inline void CCapFileFlowInfo::generate_flow(CTupleTemplateGeneratorSmart * tup node->m_flow_info = this; node->m_flags=0; node->m_template_info =template_info; - node->m_tuple_gen = tuple_gen->get_gen(); + node->m_tuple_gen = tuple_gen->get_gen(); node->m_src_ip= tuple.getClient(); node->m_dest_ip = tuple.getServer(); node->m_src_idx = tuple.getClientId(); node->m_dest_idx = tuple.getServerId(); node->m_src_port = tuple.getClientPort(); - memcpy(&node->m_src_mac, - tuple.getClientMac(), + memcpy(&node->m_src_mac, + tuple.getClientMac(), sizeof(mac_addr_align_t)); node->m_plugin_info =(void *)0; if ( unlikely( CGlobalInfo::is_learn_mode() ) ){ - // check if flow is two direction + // check if flow is two direction if ( lp->m_pkt_indication.m_desc.IsBiDirectionalFlow() ) { /* we are in learn mode */ CFlowGenListPerThread * lpThread=gen->Parent(); @@ -3822,7 +3825,7 @@ inline void CFlowGeneratorRecPerThread::generate_flow(CNodeGenerator * gen, uint64_t flow_id, CGenNode * node){ - m_flow_info->generate_flow(&tuple_gen, + m_flow_info->generate_flow(&tuple_gen, gen, time, flow_id, @@ -3869,7 +3872,7 @@ inline void CGenNode::update_next_pkt_in_flow(void){ m_pkt_info = m_flow_info->GetPacket((pkt_index-1)); } -inline void CGenNode::reset_pkt_in_flow(void){ +inline void CGenNode::reset_pkt_in_flow(void){ m_pkt_info = m_flow_info->GetPacket(0); } @@ -3898,7 +3901,7 @@ public: class CPluginCallbackSimple : public CPluginCallback { public: virtual void on_node_first(uint8_t plugin_id,CGenNode * node, - CFlowYamlInfo * template_info, + CFlowYamlInfo * template_info, CTupleTemplateGeneratorSmart * tuple_gen, CFlowGenListPerThread * flow_gen); virtual void on_node_last(uint8_t plugin_id,CGenNode * node); @@ -3950,7 +3953,7 @@ inline pkt_dir_t CGenNode::cur_interface_dir(){ return (is_eligible_from_server_side()?SERVER_SIDE:CLIENT_SIDE); }else{ return ( is_init ?CLIENT_SIDE:SERVER_SIDE); - } + } } diff --git a/src/common/Network/Packet/EthernetHeader.h b/src/common/Network/Packet/EthernetHeader.h index 87d1ed91..c9dcdbe2 100755 --- a/src/common/Network/Packet/EthernetHeader.h +++ b/src/common/Network/Packet/EthernetHeader.h @@ -1,5 +1,5 @@ /* -Copyright (c) 2015-2015 Cisco Systems, Inc. +Copyright (c) 2015-2016 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ limitations under the License. #include "PacketHeaderBase.h" #include "MacAddress.h" +#define ETH_HDR_LEN 14 /** * This class encapsulates an ethernet header. diff --git a/src/common/basic_utils.cpp b/src/common/basic_utils.cpp index 34c37755..4f5578a6 100755 --- a/src/common/basic_utils.cpp +++ b/src/common/basic_utils.cpp @@ -17,6 +17,7 @@ limitations under the License. #include <ctype.h> #include <stdio.h> #include <string> +#include <sstream> bool utl_is_file_exists (const std::string& name) { if (FILE *file = fopen(name.c_str(), "r")) { @@ -175,3 +176,25 @@ void utl_macaddr_to_str(const uint8_t *macaddr, std::string &output) { } } + +/** + * generate a random connection handler + * + */ +std::string +utl_generate_random_str(unsigned int &seed, int len) { + std::stringstream ss; + + static const char alphanum[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + + /* generate 8 bytes of random handler */ + for (int i = 0; i < len; ++i) { + ss << alphanum[rand_r(&seed) % (sizeof(alphanum) - 1)]; + } + + return (ss.str()); +} + diff --git a/src/common/basic_utils.h b/src/common/basic_utils.h index 77282eea..63e858ab 100755 --- a/src/common/basic_utils.h +++ b/src/common/basic_utils.h @@ -21,8 +21,6 @@ limitations under the License. #include <stdio.h> #include <string> - - /** * the round must be power 2 e.g 2,4,8... * @@ -87,6 +85,8 @@ bool utl_is_file_exists (const std::string& name) ; void utl_macaddr_to_str(const uint8_t *macaddr, std::string &output); +std::string utl_generate_random_str(unsigned int &seed, int len); + #endif diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp index 778c92b9..13f8eb16 100644 --- a/src/flow_stat.cpp +++ b/src/flow_stat.cpp @@ -18,6 +18,32 @@ See the License for the specific language governing permissions and limitations under the License. */ + +/* +Important classes in this file: +CFlowStatUserIdInfo - Information about one packet group id +CFlowStatUserIdMap - Mapping between packet group id (chosen by user) and hardware counter id +CFlowStatHwIdMap - Mapping between hardware id and packet group id +CFlowStatRuleMgr - API to users of the file + +General idea of operation: +For each stream needing flow statistics, the user provides packet group id (pg_id). Few streams can have the same pg_id. +We maintain reference count. +When doing start_stream, for the first stream in pg_id, hw_id is associated with the pg_id, and relevant hardware rules are +inserted (on supported hardware). When stopping all streams with the pg_id, the hw_id <--> pg_id mapping is removed, hw_id is +returned to the free hw_id pool, and hardware rules are removed. Counters for the pg_id are kept. +If starting streams again, new hw_id will be assigned, and counters will continue from where they stopped. Only When deleting +all streams using certain pg_id, infromation about this pg_id will be freed. + +For each stream we keep state in the m_rx_check.m_hw_id field. Since we keep reference count for certain structs, we want to +protect from illegal operations, like starting stream while it is already starting, stopping when it is stopped... +State machine is: +stream_init: HW_ID_INIT +stream_add: HW_ID_FREE +stream_start: legal hw_id (range is 0..MAX_FLOW_STATS) +stream_stop: HW_ID_FREE +stream_del: HW_ID_INIT + */ #include <sstream> #include <string> #include <iostream> @@ -26,6 +52,7 @@ #include "internal_api/trex_platform_api.h" #include "trex_stateless.h" #include "trex_stateless_messaging.h" +#include "trex_stateless_rx_core.h" #include "trex_stream.h" #include "flow_stat_parser.h" #include "flow_stat.h" @@ -33,8 +60,8 @@ #define FLOW_STAT_ADD_ALL_PORTS 255 -static const uint16_t FREE_HW_ID = UINT16_MAX; -static bool no_stat_supported = true; +static const uint16_t HW_ID_INIT = UINT16_MAX; +static const uint16_t HW_ID_FREE = UINT16_MAX - 1; inline std::string methodName(const std::string& prettyFunction) { @@ -48,6 +75,11 @@ inline std::string methodName(const std::string& prettyFunction) #define __METHOD_NAME__ methodName(__PRETTY_FUNCTION__) #ifdef __DEBUG_FUNC_ENTRY__ #define FUNC_ENTRY (std::cout << __METHOD_NAME__ << std::endl); +#ifdef __STREAM_DUMP__ +#define stream_dump(stream) stream->Dump(stderr) +#else +#define stream_dump(stream) +#endif #else #define FUNC_ENTRY #endif @@ -107,7 +139,7 @@ int CFlowStatUserIdInfo::add_stream(uint8_t proto) { #endif if (proto != m_proto) - return -1; + throw TrexException("Can't use same pg_id for streams with different l4 protocol"); m_ref_count++; @@ -147,7 +179,7 @@ uint16_t CFlowStatUserIdMap::get_hw_id(uint32_t user_id) { CFlowStatUserIdInfo *cf = find_user_id(user_id); if (cf == NULL) { - return FREE_HW_ID; + return HW_ID_FREE; } else { return cf->get_hw_id(); } @@ -198,7 +230,7 @@ int CFlowStatUserIdMap::add_stream(uint32_t user_id, uint8_t proto) { if (! c_user_id) { c_user_id = add_user_id(user_id, proto); if (! c_user_id) - return -1; + throw TrexException("Failed adding statistic counter - Failure in add_stream"); return 0; } else { return c_user_id->add_stream(proto); @@ -214,11 +246,11 @@ int CFlowStatUserIdMap::del_stream(uint32_t user_id) { c_user_id = find_user_id(user_id); if (! c_user_id) { - return -1; + throw TrexException("Trying to delete stream which does not exist"); } if (c_user_id->del_stream() == 0) { - // ref count of this port became 0. can release this entry. + // ref count of this entry became 0. can release this entry. m_map.erase(user_id); delete c_user_id; } @@ -237,13 +269,13 @@ int CFlowStatUserIdMap::start_stream(uint32_t user_id, uint16_t hw_id) { if (! c_user_id) { fprintf(stderr, "%s Error: Trying to associate hw id %d to user_id %d but it does not exist\n" , __func__, hw_id, user_id); - return -1; + throw TrexException("Internal error: Trying to associate non exist group id"); } if (c_user_id->is_hw_id()) { - fprintf(stderr, "%s Error: Trying to associate hw id %d to user_id %d but it is already associate to %u\n" + fprintf(stderr, "%s Error: Trying to associate hw id %d to user_id %d but it is already associated to %u\n" , __func__, hw_id, user_id, c_user_id->get_hw_id()); - return -1; + throw TrexException("Internal error: Trying to associate used packet group id to different hardware counter"); } c_user_id->set_hw_id(hw_id); c_user_id->add_started_stream(); @@ -260,9 +292,9 @@ int CFlowStatUserIdMap::start_stream(uint32_t user_id) { c_user_id = find_user_id(user_id); if (! c_user_id) { - fprintf(stderr, "%s Error: Trying to start stream on user_id %d but it does not exist\n" + fprintf(stderr, "%s Error: Trying to start stream on pg_id %d but it does not exist\n" , __func__, user_id); - return -1; + throw TrexException("Trying to start stream with non exist packet group id"); } c_user_id->add_started_stream(); @@ -281,9 +313,9 @@ int CFlowStatUserIdMap::stop_stream(uint32_t user_id) { c_user_id = find_user_id(user_id); if (! c_user_id) { - fprintf(stderr, "%s Error: Trying to stop stream on user_id %d but it does not exist\n" + fprintf(stderr, "%s Error: Trying to stop stream on pg_id %d but it does not exist\n" , __func__, user_id); - return -1; + throw TrexException("Trying to stop stream with non exist packet group id"); } return c_user_id->stop_started_stream(); @@ -332,7 +364,7 @@ uint16_t CFlowStatUserIdMap::unmap(uint32_t user_id) { CFlowStatHwIdMap::CFlowStatHwIdMap() { m_num_free = MAX_FLOW_STATS; for (int i = 0; i < MAX_FLOW_STATS; i++) { - m_map[i] = FREE_HW_ID; + m_map[i] = HW_ID_FREE; } } @@ -357,11 +389,11 @@ std::ostream& operator<<(std::ostream& os, const CFlowStatHwIdMap& cf) { uint16_t CFlowStatHwIdMap::find_free_hw_id() { for (int i = 0; i < MAX_FLOW_STATS; i++) { - if (m_map[i] == FREE_HW_ID) + if (m_map[i] == HW_ID_FREE) return i; } - return FREE_HW_ID; + return HW_ID_FREE; } void CFlowStatHwIdMap::map(uint16_t hw_id, uint32_t user_id) { @@ -378,7 +410,7 @@ void CFlowStatHwIdMap::unmap(uint16_t hw_id) { std::cout << __METHOD_NAME__ << " hw id:" << hw_id << std::endl; #endif - m_map[hw_id] = FREE_HW_ID; + m_map[hw_id] = HW_ID_FREE; m_num_free++; } @@ -388,6 +420,34 @@ CFlowStatRuleMgr::CFlowStatRuleMgr() { m_max_hw_id = -1; m_num_started_streams = 0; m_ring_to_rx = NULL; + m_capabilities = 0; + m_parser = NULL; + m_rx_core = NULL; +} + +CFlowStatRuleMgr::~CFlowStatRuleMgr() { + if (m_parser) + delete m_parser; +} + +void CFlowStatRuleMgr::create() { + uint16_t num_counters, capabilities; + TrexStateless *tstateless = get_stateless_obj(); + assert(tstateless); + + m_api = tstateless->get_platform_api(); + assert(m_api); + m_api->get_interface_stat_info(0, num_counters, capabilities); + m_api->get_port_num(m_num_ports); + for (uint8_t port = 0; port < m_num_ports; port++) { + assert(m_api->reset_hw_flow_stats(port) == 0); + } + m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); + assert(m_ring_to_rx); + m_rx_core = get_rx_sl_core_obj(); + m_parser = m_api->get_flow_stat_parser(); + assert(m_parser); + m_capabilities = capabilities; } std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) { @@ -397,110 +457,111 @@ std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) { return os; } -int CFlowStatRuleMgr::compile_stream(const TrexStream * stream, Cxl710Parser &parser) { +int CFlowStatRuleMgr::compile_stream(const TrexStream * stream, CFlowStatParser *parser) { #ifdef __DEBUG_FUNC_ENTRY__ std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << " en:"; std::cout << stream->m_rx_check.m_enabled << std::endl; #endif - // currently we support only IP ID rule types - // all our ports are the same type, so testing port 0 is enough - uint16_t num_counters, capabilities; - m_api->get_interface_stat_info(0, num_counters, capabilities); - if ((capabilities & TrexPlatformApi::IF_STAT_IPV4_ID) == 0) { - return -2; - } - - if (parser.parse(stream->m_pkt.binary, stream->m_pkt.len) != 0) { + if (parser->parse(stream->m_pkt.binary, stream->m_pkt.len) != 0) { // if we could not parse the packet, but no stat count needed, it is probably OK. if (stream->m_rx_check.m_enabled) { fprintf(stderr, "Error: %s - Compilation failed\n", __func__); - return -1; + throw TrexException("Failed parsing given packet for flow stat. Probably bad packet format."); } else { return 0; } } - if (!parser.is_fdir_supported()) { + if (!parser->is_stat_supported()) { if (stream->m_stream_id <= 0) { - // rx stat not needed. Do nothing. + // flow stat not needed. Do nothing. return 0; } else { - // rx stat needed, but packet format is not supported - fprintf(stderr, "Error: %s - Unsupported packet format for rx stat\n", __func__); - return -1; + // flow stat needed, but packet format is not supported + fprintf(stderr, "Error: %s - Unsupported packet format for flow stat\n", __func__); + throw TrexException("Unsupported packet format for flow stat on given interface type"); } } return 0; } -int CFlowStatRuleMgr::add_stream(const TrexStream * stream) { +void CFlowStatRuleMgr::copy_state(TrexStream * from, TrexStream * to) { + to->m_rx_check.m_hw_id = from->m_rx_check.m_hw_id; +} +void CFlowStatRuleMgr::init_stream(TrexStream * stream) { + stream->m_rx_check.m_hw_id = HW_ID_INIT; +} + +int CFlowStatRuleMgr::add_stream(TrexStream * stream) { #ifdef __DEBUG_FUNC_ENTRY__ std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl; + stream_dump(stream); #endif + if (! stream->m_rx_check.m_enabled) { + return 0; + } + // Init everything here, and not in the constructor, since we relay on other objects // By the time a stream is added everything else is initialized. if (! m_api ) { - TrexStateless *tstateless = get_stateless_obj(); - m_api = tstateless->get_platform_api(); - uint16_t num_counters, capabilities; - m_api->get_interface_stat_info(0, num_counters, capabilities); - if ((capabilities & TrexPlatformApi::IF_STAT_IPV4_ID) == 0) { - // All our interfaces are from the same type. If statistics not supported. - // no operation will work - return -1; - } else { - no_stat_supported = false; - } - m_api->get_port_num(m_num_ports); - for (uint8_t port = 0; port < m_num_ports; port++) { - assert(m_api->reset_hw_flow_stats(port) == 0); - } - m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); + create(); } - if (no_stat_supported) - return -ENOTSUP; + //??? put back assert(stream->m_rx_check.m_hw_id == HW_ID_INIT); - Cxl710Parser parser; - int ret; + uint16_t rule_type = TrexPlatformApi::IF_STAT_IPV4_ID; // In the future need to get it from the stream; - if (! stream->m_rx_check.m_enabled) { - return 0; + if ((m_capabilities & rule_type) == 0) { + fprintf(stderr, "Error: %s - rule type not supported by interface\n", __func__); + throw TrexException("Interface does not support given rule type"); } - if ((ret = compile_stream(stream, parser)) < 0) - return ret; + // compile_stream throws exception if something goes wrong + compile_stream(stream, m_parser); uint8_t l4_proto; - if (parser.get_l4_proto(l4_proto) < 0) { - printf("Error: %s failed finding l4 proto\n", __func__); - return -1; + if (m_parser->get_l4_proto(l4_proto) < 0) { + fprintf(stderr, "Error: %s failed finding l4 proto\n", __func__); + throw TrexException("Failed determining l4 proto for packet"); } - return m_user_id_map.add_stream(stream->m_rx_check.m_pg_id, l4_proto); + // throws exception if there is error + m_user_id_map.add_stream(stream->m_rx_check.m_pg_id, l4_proto); + + stream->m_rx_check.m_hw_id = HW_ID_FREE; + return 0; } -int CFlowStatRuleMgr::del_stream(const TrexStream * stream) { +int CFlowStatRuleMgr::del_stream(TrexStream * stream) { #ifdef __DEBUG_FUNC_ENTRY__ std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl; + stream_dump(stream); #endif - if (no_stat_supported) - return -ENOTSUP; - if (! stream->m_rx_check.m_enabled) { return 0; } - if (m_user_id_map.is_started(stream->m_rx_check.m_pg_id)) { - std::cerr << "Error: Trying to delete flow statistics stream " << stream->m_rx_check.m_pg_id - << " which is not stopped." << std::endl; - return -1; + if (! m_api) + throw TrexException("Called del_stream, but no stream was added"); + + // we got del_stream command for a stream which has valid hw_id. + // Probably someone forgot to call stop + if(stream->m_rx_check.m_hw_id < MAX_FLOW_STATS) { + stop_stream(stream); + } + + // calling del for same stream twice, or for a stream which was never "added" + if(stream->m_rx_check.m_hw_id == HW_ID_INIT) { + return 0; } + // Throws exception in case of error + m_user_id_map.del_stream(stream->m_rx_check.m_pg_id); + stream->m_rx_check.m_hw_id = HW_ID_INIT; - return m_user_id_map.del_stream(stream->m_rx_check.m_pg_id); + return 0; } // called on all streams, when stream start to transmit @@ -509,46 +570,73 @@ int CFlowStatRuleMgr::del_stream(const TrexStream * stream) { // If stream does not need flow stat counting, make sure it does not interfere with // other streams that do need stat counting. // Might change the IP ID of the stream packet -int CFlowStatRuleMgr::start_stream(TrexStream * stream, uint16_t &ret_hw_id) { +int CFlowStatRuleMgr::start_stream(TrexStream * stream) { #ifdef __DEBUG_FUNC_ENTRY__ std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl; + stream_dump(stream); #endif - Cxl710Parser parser; int ret; - - if (no_stat_supported) - return -ENOTSUP; - - if ((ret = compile_stream(stream, parser)) < 0) - return ret; + // Streams which does not need statistics might be started, before any stream that do + // need statistcs, so start_stream might be called before add_stream + if (! m_api ) { + create(); + } // first handle streams that do not need rx stat if (! stream->m_rx_check.m_enabled) { - // no need for stat count + try { + compile_stream(stream, m_parser); + } catch (TrexException) { + // If no statistics needed, and we can't parse the stream, that's OK. + return 0; + } + uint16_t ip_id; - if (parser.get_ip_id(ip_id) < 0) { - return 0; // if we could not find and ip id, no need to fix + if (m_parser->get_ip_id(ip_id) < 0) { + return 0; // if we could not find the ip id, no need to fix } // verify no reserved IP_ID used, and change if needed if (ip_id >= IP_ID_RESERVE_BASE) { - if (parser.set_ip_id(ip_id & 0xefff) < 0) { - return -1; + if (m_parser->set_ip_id(ip_id & 0xefff) < 0) { + throw TrexException("Stream IP ID in reserved range. Failed changing it"); } } return 0; } - uint16_t hw_id; // from here, we know the stream need rx stat + + // Starting a stream which was never added + if (stream->m_rx_check.m_hw_id == HW_ID_INIT) { + add_stream(stream); + } + + if (stream->m_rx_check.m_hw_id < MAX_FLOW_STATS) { + throw TrexException("Starting a stream which was already started"); + } + + uint16_t rule_type = TrexPlatformApi::IF_STAT_IPV4_ID; // In the future, need to get it from the stream; + + if ((m_capabilities & rule_type) == 0) { + fprintf(stderr, "Error: %s - rule type not supported by interface\n", __func__); + throw TrexException("Interface does not support given rule type"); + } + + // compile_stream throws exception if something goes wrong + if ((ret = compile_stream(stream, m_parser)) < 0) + return ret; + + uint16_t hw_id; + if (m_user_id_map.is_started(stream->m_rx_check.m_pg_id)) { m_user_id_map.start_stream(stream->m_rx_check.m_pg_id); // just increase ref count; hw_id = m_user_id_map.get_hw_id(stream->m_rx_check.m_pg_id); // can't fail if we got here } else { hw_id = m_hw_id_map.find_free_hw_id(); - if (hw_id == FREE_HW_ID) { + if (hw_id == HW_ID_FREE) { printf("Error: %s failed finding free hw_id\n", __func__); - return -1; + throw TrexException("Failed allocating statistic counter. Probably all are used."); } else { if (hw_id > m_max_hw_id) { m_max_hw_id = hw_id; @@ -557,19 +645,43 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream, uint16_t &ret_hw_id) { m_user_id_map.start_stream(user_id, hw_id); m_hw_id_map.map(hw_id, user_id); add_hw_rule(hw_id, m_user_id_map.l4_proto(user_id)); + // clear hardware counters. Just in case we have garbage from previous iteration + rx_per_flow_t rx_counter; + tx_per_flow_t tx_counter; + for (uint8_t port = 0; port < m_num_ports; port++) { + m_api->get_flow_stats(port, &rx_counter, (void *)&tx_counter, hw_id, hw_id, true); + } } } - parser.set_ip_id(IP_ID_RESERVE_BASE + hw_id); + m_parser->set_ip_id(IP_ID_RESERVE_BASE + hw_id); - ret_hw_id = hw_id; + // saving given hw_id on stream for use by tx statistics count + stream->m_rx_check.m_hw_id = hw_id; #ifdef __DEBUG_FUNC_ENTRY__ - std::cout << "exit:" << __METHOD_NAME__ << " hw_id:" << ret_hw_id << std::endl; + std::cout << "exit:" << __METHOD_NAME__ << " hw_id:" << hw_id << std::endl; + stream_dump(stream); #endif if (m_num_started_streams == 0) { send_start_stop_msg_to_rx(true); // First transmitting stream. Rx core should start reading packets; + + // wait to make sure that message is acknowledged. RX core might be in deep sleep mode, and we want to + // start transmitting packets only after it is working, otherwise, packets will get lost. + if (m_rx_core) { // in simulation, m_rx_core will be NULL + int count = 0; + while (!m_rx_core->is_working()) { + delay(1); + count++; + if (count == 100) { + throw TrexException("Critical error!! - RX core failed to start"); + } + } + } + } else { + // make sure rx core is working. If not, we got really confused somehow. + assert(m_rx_core->is_working()); } m_num_started_streams++; return 0; @@ -583,17 +695,25 @@ int CFlowStatRuleMgr::add_hw_rule(uint16_t hw_id, uint8_t proto) { return 0; } -int CFlowStatRuleMgr::stop_stream(const TrexStream * stream) { +int CFlowStatRuleMgr::stop_stream(TrexStream * stream) { #ifdef __DEBUG_FUNC_ENTRY__ std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl; + stream_dump(stream); #endif - if (no_stat_supported) - return -ENOTSUP; - if (! stream->m_rx_check.m_enabled) { return 0; } + if (! m_api) + throw TrexException("Called stop_stream, but no stream was added"); + + if (stream->m_rx_check.m_hw_id >= MAX_FLOW_STATS) { + // We allow stopping while already stopped. Will not hurt us. + return 0; + } + + stream->m_rx_check.m_hw_id = HW_ID_FREE; + if (m_user_id_map.stop_stream(stream->m_rx_check.m_pg_id) == 0) { // last stream associated with the entry stopped transmittig. // remove user_id <--> hw_id mapping @@ -601,7 +721,7 @@ int CFlowStatRuleMgr::stop_stream(const TrexStream * stream) { uint16_t hw_id = m_user_id_map.get_hw_id(stream->m_rx_check.m_pg_id); if (hw_id >= MAX_FLOW_STATS) { fprintf(stderr, "Error: %s got wrong hw_id %d from unmap\n", __func__, hw_id); - return -1; + throw TrexException("Internal error in stop_stream. Got bad hw_id"); } else { // update counters, and reset before unmapping CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(hw_id)); diff --git a/src/flow_stat.h b/src/flow_stat.h index 83f076de..06b54d70 100644 --- a/src/flow_stat.h +++ b/src/flow_stat.h @@ -37,6 +37,8 @@ typedef std::map<uint32_t, uint16_t> flow_stat_map_t; typedef std::map<uint32_t, uint16_t>::iterator flow_stat_map_it_t; +class CRxCoreStateless; + class tx_per_flow_t_ { public: tx_per_flow_t_() { @@ -104,7 +106,7 @@ typedef class tx_per_flow_t_ tx_per_flow_t; typedef class tx_per_flow_t_ rx_per_flow_t; class CPhyEthIF; -class Cxl710Parser; +class CFlowStatParser; class CFlowStatUserIdInfo { public: @@ -198,16 +200,20 @@ class CFlowStatRuleMgr { }; CFlowStatRuleMgr(); + ~CFlowStatRuleMgr(); friend std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf); - int add_stream(const TrexStream * stream); - int del_stream(const TrexStream * stream); - int start_stream(TrexStream * stream, uint16_t &ret_hw_id); - int stop_stream(const TrexStream * stream); + void copy_state(TrexStream * from, TrexStream * to); + void init_stream(TrexStream * stream); + int add_stream(TrexStream * stream); + int del_stream(TrexStream * stream); + int start_stream(TrexStream * stream); + int stop_stream(TrexStream * stream); int get_active_pgids(flow_stat_active_t &result); bool dump_json(std::string & json, bool baseline); private: - int compile_stream(const TrexStream * stream, Cxl710Parser &parser); + void create(); + int compile_stream(const TrexStream * stream, CFlowStatParser *parser); int add_hw_rule(uint16_t hw_id, uint8_t proto); void send_start_stop_msg_to_rx(bool is_start); @@ -216,9 +222,12 @@ class CFlowStatRuleMgr { CFlowStatUserIdMap m_user_id_map; // map user ids to hw ids uint8_t m_num_ports; // How many ports are being used const TrexPlatformApi *m_api; + const CRxCoreStateless *m_rx_core; int m_max_hw_id; // max hw id we ever used uint32_t m_num_started_streams; // How many started (transmitting) streams we have CNodeRing *m_ring_to_rx; // handle for sending messages to Rx core + CFlowStatParser *m_parser; + uint16_t m_capabilities; }; #endif diff --git a/src/flow_stat_parser.cpp b/src/flow_stat_parser.cpp index 52824f73..8cb41fb7 100644 --- a/src/flow_stat_parser.cpp +++ b/src/flow_stat_parser.cpp @@ -25,38 +25,42 @@ #include <common/Network/Packet/EthernetHeader.h> #include <flow_stat_parser.h> -Cxl710Parser::Cxl710Parser() { - reset(); -} - -void Cxl710Parser::reset() { +void CFlowStatParser::reset() { m_ipv4 = 0; m_l4_proto = 0; - m_fdir_supported = false; + m_stat_supported = false; } -int Cxl710Parser::parse(uint8_t *p, uint16_t len) { +int CFlowStatParser::parse(uint8_t *p, uint16_t len) { EthernetHeader *ether = (EthernetHeader *)p; + int min_len = ETH_HDR_LEN + IPV4_HDR_LEN; + reset(); + + if (len < min_len) + return -1; switch( ether->getNextProtocol() ) { case EthernetHeader::Protocol::IP : - m_ipv4 = (IPHeader *)(p + 14); - m_fdir_supported = true; + m_ipv4 = (IPHeader *)(p + ETH_HDR_LEN); + m_stat_supported = true; break; case EthernetHeader::Protocol::VLAN : + min_len += 4; + if (len < min_len) + return -1; switch ( ether->getVlanProtocol() ){ case EthernetHeader::Protocol::IP: m_ipv4 = (IPHeader *)(p + 18); - m_fdir_supported = true; + m_stat_supported = true; break; default: - m_fdir_supported = false; + m_stat_supported = false; return -1; } break; default: - m_fdir_supported = false; + m_stat_supported = false; return -1; break; } @@ -64,7 +68,7 @@ int Cxl710Parser::parse(uint8_t *p, uint16_t len) { return 0; } -int Cxl710Parser::get_ip_id(uint16_t &ip_id) { +int CFlowStatParser::get_ip_id(uint16_t &ip_id) { if (! m_ipv4) return -1; @@ -73,18 +77,18 @@ int Cxl710Parser::get_ip_id(uint16_t &ip_id) { return 0; } -int Cxl710Parser::set_ip_id(uint16_t new_id) { +int CFlowStatParser::set_ip_id(uint16_t new_id) { if (! m_ipv4) return -1; // Updating checksum, not recalculating, so if someone put bad checksum on purpose, it will stay bad - m_ipv4->updateCheckSum(m_ipv4->getId(), PKT_NTOHS(new_id)); + m_ipv4->updateCheckSum(PKT_NTOHS(m_ipv4->getId()), PKT_NTOHS(new_id)); m_ipv4->setId(new_id); return 0; } -int Cxl710Parser::get_l4_proto(uint8_t &proto) { +int CFlowStatParser::get_l4_proto(uint8_t &proto) { if (! m_ipv4) return -1; @@ -96,7 +100,7 @@ int Cxl710Parser::get_l4_proto(uint8_t &proto) { static const uint16_t TEST_IP_ID = 0xabcd; static const uint8_t TEST_L4_PROTO = 0x11; -int Cxl710Parser::test() { +int CFlowStatParser::test() { uint16_t ip_id = 0; uint8_t l4_proto; uint8_t test_pkt[] = { @@ -107,7 +111,7 @@ int Cxl710Parser::test() { 0x0a, 0xbc, 0x08, 0x00, // vlan // IP header 0x45,0x02,0x00,0x30, - 0x00,0x00,0x40,0x00, + 0x01,0x02,0x40,0x00, 0xff, TEST_L4_PROTO, 0xbd,0x04, 0x10,0x0,0x0,0x1, 0x30,0x0,0x0,0x1, @@ -124,14 +128,37 @@ int Cxl710Parser::test() { assert(m_ipv4->isChecksumOK() == true); assert(get_l4_proto(l4_proto) == 0); assert(l4_proto == TEST_L4_PROTO); - assert(m_fdir_supported == true); + assert(m_stat_supported == true); reset(); // bad packet test_pkt[16] = 0xaa; assert (parse(test_pkt, sizeof(test_pkt)) == -1); - assert(m_fdir_supported == false); + assert(m_stat_supported == false); + + return 0; +} + +// In 82599 10G card we do not support VLANs +int C82599Parser::parse(uint8_t *p, uint16_t len) { + EthernetHeader *ether = (EthernetHeader *)p; + int min_len = ETH_HDR_LEN + IPV4_HDR_LEN; + reset(); + + if (len < min_len) + return -1; + + switch( ether->getNextProtocol() ) { + case EthernetHeader::Protocol::IP : + m_ipv4 = (IPHeader *)(p + ETH_HDR_LEN); + m_stat_supported = true; + break; + default: + m_stat_supported = false; + return -1; + break; + } return 0; } diff --git a/src/flow_stat_parser.h b/src/flow_stat_parser.h index 606a1bec..8c9e1418 100644 --- a/src/flow_stat_parser.h +++ b/src/flow_stat_parser.h @@ -19,19 +19,33 @@ limitations under the License. */ -class Cxl710Parser { +#ifndef __FLOW_STAT_PARSER_H__ +#define __FLOW_STAT_PARSER_H__ + +// Basic flow stat parser. Relevant for xl710/x710/x350 cards +#include "common/Network/Packet/IPHeader.h" + +class CFlowStatParser { public: - Cxl710Parser(); - void reset(); - int parse(uint8_t *pkt, uint16_t len); - bool is_fdir_supported() {return m_fdir_supported == true;}; - int get_ip_id(uint16_t &ip_id); - int set_ip_id(uint16_t ip_id); - int get_l4_proto(uint8_t &proto); - int test(); + virtual ~CFlowStatParser() {}; + virtual void reset(); + virtual int parse(uint8_t *pkt, uint16_t len); + virtual bool is_stat_supported() {return m_stat_supported == true;}; + virtual int get_ip_id(uint16_t &ip_id); + virtual int set_ip_id(uint16_t ip_id); + virtual int get_l4_proto(uint8_t &proto); + virtual int test(); - private: + protected: IPHeader *m_ipv4; - bool m_fdir_supported; + bool m_stat_supported; uint8_t m_l4_proto; }; + +class C82599Parser : public CFlowStatParser { + public: + ~C82599Parser() {}; + int parse(uint8_t *pkt, uint16_t len); +}; + +#endif diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp index c3dfcb95..a5cf3307 100644 --- a/src/gtest/trex_stateless_gtest.cpp +++ b/src/gtest/trex_stateless_gtest.cpp @@ -3581,7 +3581,7 @@ class rx_stat_pkt_parse : public testing::Test { TEST_F(rx_stat_pkt_parse, x710_parser) { - Cxl710Parser parser; + CFlowStatParser parser; parser.test(); } diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h index dbca5a8a..90eaa7c7 100644 --- a/src/internal_api/trex_platform_api.h +++ b/src/internal_api/trex_platform_api.h @@ -26,6 +26,7 @@ limitations under the License. #include <vector> #include <string> #include <string.h> +#include "flow_stat_parser.h" #include "trex_defs.h" /** @@ -34,6 +35,7 @@ limitations under the License. * @author imarom (06-Oct-15) */ + class TrexPlatformGlobalStats { public: TrexPlatformGlobalStats() { @@ -151,6 +153,7 @@ public: virtual bool get_promiscuous(uint8_t port_id) const = 0; virtual void flush_dp_messages() const = 0; virtual int get_active_pgids(flow_stat_active_t &result) const = 0; + virtual CFlowStatParser *get_flow_stat_parser() const = 0; virtual ~TrexPlatformApi() {} }; @@ -180,6 +183,7 @@ public: bool get_promiscuous(uint8_t port_id) const; void flush_dp_messages() const; int get_active_pgids(flow_stat_active_t &result) const; + CFlowStatParser *get_flow_stat_parser() const; }; @@ -241,6 +245,7 @@ public: void flush_dp_messages() const { } int get_active_pgids(flow_stat_active_t &result) const {return 0;} + CFlowStatParser *get_flow_stat_parser() const {return new CFlowStatParser();} private: int m_dp_core_count; diff --git a/src/latency.h b/src/latency.h index 3dd1cc36..f5f90cf9 100644 --- a/src/latency.h +++ b/src/latency.h @@ -22,6 +22,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include <bp_sim.h> +#include <flow_stat.h> #define L_PKT_SUBMODE_NO_REPLY 1 #define L_PKT_SUBMODE_REPLY 2 diff --git a/src/main.cpp b/src/main.cpp index 6a6b5721..3c68990c 100755 --- a/src/main.cpp +++ b/src/main.cpp @@ -253,6 +253,10 @@ TrexStateless * get_stateless_obj() { return m_sim_statelss_obj; } +CRxCoreStateless * get_rx_sl_core_obj() { + return NULL; +} + void set_stateless_obj(TrexStateless *obj) { m_sim_statelss_obj = obj; } diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index ee408c63..1f415958 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -148,6 +148,7 @@ public: virtual int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd) { return -1;} virtual int get_stat_counters_num() {return 0;} virtual int get_rx_stat_capabilities() {return 0;} + virtual CFlowStatParser *get_flow_stat_parser(); }; @@ -281,6 +282,7 @@ public: virtual int wait_for_stable_link(); virtual int get_stat_counters_num() {return MAX_FLOW_STATS;} virtual int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;} + virtual CFlowStatParser *get_flow_stat_parser(); }; class CTRexExtendedDriverBase40G : public CTRexExtendedDriverBase10G { @@ -332,9 +334,12 @@ public: // disabling flow control on 40G using DPDK API causes the interface to malfunction virtual bool flow_control_disable_supported(){return false;} virtual bool hw_rx_stat_supported(){return true;} + virtual CFlowStatParser *get_flow_stat_parser(); + private: virtual void add_del_rules(enum rte_filter_op op, uint8_t port_id, uint16_t type, uint8_t ttl, uint16_t ip_id, int queue, uint16_t stat_idx); virtual int configure_rx_filter_rules_statfull(CPhyEthIF * _if); + private: uint8_t m_if_per_card; }; @@ -1231,6 +1236,10 @@ void CPhyEthIFStats::Dump(FILE *fd){ DP_A(rx_nombuf); } +// only on VM we have rx queues on DP cores +void CPhyEthIF::flush_dp_rx_queue(void) { +} + // Clear the RX queue of an interface, dropping all packets void CPhyEthIF::flush_rx_queue(void){ @@ -1735,8 +1744,8 @@ public: virtual int send_node(CGenNode * node); virtual void send_one_pkt(pkt_dir_t dir, rte_mbuf_t *m); + virtual void flush_dp_rx_queue(void); virtual int flush_tx_queue(void); - __attribute__ ((noinline)) void flush_rx_queue(); __attribute__ ((noinline)) void update_mac_addr(CGenNode * node,uint8_t *p); @@ -1804,6 +1813,11 @@ bool CCoreEthIF::Create(uint8_t core_id, return (true); } +// On VM, we get the packets in dp core, so just call general flush_rx_queue +void CCoreEthIF::flush_dp_rx_queue(void) { + flush_rx_queue(); +} + // This function is only relevant if we are in VM. In this case, we only have one rx queue. Can't have // rules to drop queue 0, and pass queue 1 to RX core, like in other cases. // We receive all packets in the same core that transmitted, and handle them to RX core. @@ -2699,7 +2713,7 @@ public: CFlowGenList m_fl; bool m_fl_was_init; volatile uint8_t m_signal[BP_MAX_CORES] __rte_cache_aligned ; // Signal to main core when DP thread finished - volatile bool m_rx_running; // Signal main core when RX thread finished + volatile bool m_sl_rx_running; // Signal main core when RX thread finished CLatencyManager m_mg; // statefull RX core CRxCoreStateless m_rx_sl; // stateless RX core CTrexGlobalIoMode m_io_modes; @@ -2793,7 +2807,9 @@ void CGlobalTRex::try_stop_all_cores(){ TrexStatelessDpQuit * dp_msg= new TrexStatelessDpQuit(); TrexStatelessRxQuit * rx_msg= new TrexStatelessRxQuit(); send_message_all_dp(dp_msg); - send_message_to_rx(rx_msg); + if (get_is_stateless()) { + send_message_to_rx(rx_msg); + } delete dp_msg; // no need to delete rx_msg. Deleted by receiver bool all_core_finished = false; @@ -3804,16 +3820,16 @@ int CGlobalTRex::run_in_master() { int CGlobalTRex::run_in_rx_core(void){ if (get_is_stateless()) { - m_rx_running = true; + m_sl_rx_running = true; m_rx_sl.start(); + m_sl_rx_running = false; } else { if ( CGlobalInfo::m_options.is_rx_enabled() ){ - m_rx_running = true; + m_sl_rx_running = false; m_mg.start(0); } } - m_rx_running = false; return (0); } @@ -3905,7 +3921,7 @@ bool CGlobalTRex::is_all_cores_finished() { return false; } } - if (m_rx_running) + if (m_sl_rx_running) return false; return true; @@ -4116,11 +4132,14 @@ bool CCoreEthIF::process_rx_pkt(pkt_dir_t dir, return (send); } - TrexStateless * get_stateless_obj() { return g_trex.m_trex_stateless; } +CRxCoreStateless * get_rx_sl_core_obj() { + return &g_trex.m_rx_sl; +} + static int latency_one_lcore(__attribute__((unused)) void *dummy) { CPlatformSocketInfo * lpsock=&CGlobalInfo::m_socket; @@ -4274,12 +4293,19 @@ int core_mask_sanity(uint32_t wanted_core_mask) { wanted_core_num = num_set_bits(wanted_core_mask); calc_core_num = num_set_bits(calc_core_mask); + if (calc_core_num == 1) { + printf ("Error: You have only 1 core available. Minimum configuration requires 2 cores\n"); + printf(" If you are running on VM, consider adding more cores if possible\n"); + return -1; + } if (wanted_core_num > calc_core_num) { printf("Error: You have %d threads available, but you asked for %d threads.\n", calc_core_num, wanted_core_num); printf(" Calculation is: -c <num>(%d) * dual ports (%d) + 1 master thread %s" , CGlobalInfo::m_options.preview.getCores(), CGlobalInfo::m_options.get_expected_dual_ports() , get_is_rx_thread_enabled() ? "+1 latency thread (because of -l flag)\n" : "\n"); - printf(" Maybe try smaller -c <num>.\n"); + if (CGlobalInfo::m_options.preview.getCores() > 1) + printf(" Maybe try smaller -c <num>.\n"); + printf(" If you are running on VM, consider adding more cores if possible\n"); return -1; } @@ -4483,7 +4509,7 @@ int main_test(int argc , char * argv[]){ g_trex.reset_counters(); } - g_trex.m_rx_running = false; + g_trex.m_sl_rx_running = false; if ( get_is_stateless() ) { g_trex.start_master_stateless(); @@ -4537,6 +4563,12 @@ int CTRexExtendedDriverBase::configure_drop_queue(CPhyEthIF * _if) { return (rte_eth_dev_rx_queue_stop(port_id, 0)); } +CFlowStatParser *CTRexExtendedDriverBase::get_flow_stat_parser() { + CFlowStatParser *parser = new CFlowStatParser(); + assert (parser); + return parser; +} + void wait_x_sec(int sec) { int i; printf(" wait %d sec ", sec); @@ -4940,6 +4972,12 @@ int CTRexExtendedDriverBase10G::wait_for_stable_link(){ return (0); } +CFlowStatParser *CTRexExtendedDriverBase10G::get_flow_stat_parser() { + CFlowStatParser *parser = new C82599Parser(); + assert (parser); + return parser; +} + //////////////////////////////////////////////////////////////////////////////// void CTRexExtendedDriverBase40G::clear_extended_stats(CPhyEthIF * _if){ rte_eth_stats_reset(_if->get_port_id()); @@ -5167,6 +5205,12 @@ int CTRexExtendedDriverBase40G::wait_for_stable_link(){ return (0); } +CFlowStatParser *CTRexExtendedDriverBase40G::get_flow_stat_parser() { + CFlowStatParser *parser = new CFlowStatParser(); + assert (parser); + return parser; +} + ///////////////////////////////////////////////////////////////////// @@ -5407,3 +5451,8 @@ void TrexDpdkPlatformApi::flush_dp_messages() const { int TrexDpdkPlatformApi::get_active_pgids(flow_stat_active_t &result) const { return g_trex.m_trex_stateless->m_rx_flow_stat.get_active_pgids(result); } + +CFlowStatParser *TrexDpdkPlatformApi::get_flow_stat_parser() const { + return CTRexExtendedDriverDb::Ins()->get_drv() + ->get_flow_stat_parser(); +} diff --git a/src/main_dpdk.h b/src/main_dpdk.h index ff1ea784..a9bfed39 100644 --- a/src/main_dpdk.h +++ b/src/main_dpdk.h @@ -122,6 +122,7 @@ class CPhyEthIF { CPhyEthIFStats & get_stats(){ return ( m_stats ); } + void flush_dp_rx_queue(void); void flush_rx_queue(void); int add_rx_flow_stat_rule(uint8_t type, uint16_t proto, uint16_t id); int del_rx_flow_stat_rule(uint8_t type, uint16_t proto, uint16_t id); diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index f054c0ed..f7a23188 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -38,6 +38,50 @@ limitations under the License. using namespace std; /** + * API sync + */ +trex_rpc_cmd_rc_e +TrexRpcCmdAPISync::_run(const Json::Value ¶ms, Json::Value &result) { + const Json::Value &api_vers = parse_array(params, "api_vers", result); + + Json::Value api_ver_rc = Json::arrayValue; + + /* for every element in the list - generate the appropirate API handler */ + for (const auto api_ver : api_vers) { + Json::Value single_rc; + + /* only those are supported */ + const std::string type = parse_choice(api_ver, "type", {"core"}, result); + + int major = parse_int(api_ver, "major", result); + int minor = parse_int(api_ver, "minor", result); + APIClass::type_e api_type; + + /* decode type of API */ + if (type == "core") { + api_type = APIClass::API_CLASS_TYPE_CORE; + } + + single_rc["type"] = type; + + /* this section might throw exception in case versions do not match */ + try { + single_rc["api_h"] = get_stateless_obj()->verify_api(api_type, major, minor); + + } catch (const TrexAPIException &e) { + generate_execute_err(result, e.what()); + } + + /* add to the response */ + api_ver_rc.append(single_rc); + } + + result["result"]["api_vers"] = api_ver_rc; + + return (TREX_RPC_CMD_OK); +} + +/** * ping command */ trex_rpc_cmd_rc_e diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp index 68bebeb6..40719325 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp @@ -545,7 +545,11 @@ TrexRpcCmdStartTraffic::_run(const Json::Value ¶ms, Json::Value &result) { std::string type = parse_choice(mul_obj, "type", TrexPortMultiplier::g_types, result); std::string op = parse_string(mul_obj, "op", result); double value = parse_double(mul_obj, "value", result); - + + if ( value <=0 ){ + generate_parse_err(result, "multiplier can't be zero"); + } + if (op != "abs") { generate_parse_err(result, "start message can only specify absolute speed rate"); } @@ -586,6 +590,27 @@ TrexRpcCmdStopTraffic::_run(const Json::Value ¶ms, Json::Value &result) { } /*************************** + * remove all hardware filters + * + **************************/ +trex_rpc_cmd_rc_e +TrexRpcCmdRemoveRXFilters::_run(const Json::Value ¶ms, Json::Value &result) { + + uint8_t port_id = parse_port(params, result); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + + try { + port->remove_rx_filters(); + } catch (const TrexException &ex) { + generate_execute_err(result, ex.what()); + } + + result["result"] = Json::objectValue; + + return (TREX_RPC_CMD_OK); +} + +/*************************** * get all streams * **************************/ diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index c4b01b85..428bdd7b 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -36,33 +36,39 @@ class TrexStream; * syntactic sugar for creating a simple command */ -#define TREX_RPC_CMD_DEFINE_EXTENDED(class_name, cmd_name, param_count, needs_ownership, ext) \ - class class_name : public TrexRpcCommand { \ - public: \ - class_name () : TrexRpcCommand(cmd_name, param_count, needs_ownership) {} \ - protected: \ - virtual trex_rpc_cmd_rc_e _run(const Json::Value ¶ms, Json::Value &result); \ - ext \ +#define TREX_RPC_CMD_DEFINE_EXTENDED(class_name, cmd_name, param_count, needs_ownership, api_type, ext) \ + class class_name : public TrexRpcCommand { \ + public: \ + class_name () : TrexRpcCommand(cmd_name, param_count, needs_ownership, api_type) {} \ + protected: \ + virtual trex_rpc_cmd_rc_e _run(const Json::Value ¶ms, Json::Value &result); \ + ext \ } -#define TREX_RPC_CMD_DEFINE(class_name, cmd_name, param_count, needs_ownership) TREX_RPC_CMD_DEFINE_EXTENDED(class_name, cmd_name, param_count, needs_ownership, ;) +#define TREX_RPC_CMD_DEFINE(class_name, cmd_name, param_count, needs_ownership, api_type) TREX_RPC_CMD_DEFINE_EXTENDED(class_name, cmd_name, param_count, needs_ownership, api_type, ;) /** * test cmds */ -TREX_RPC_CMD_DEFINE(TrexRpcCmdTestAdd, "test_add", 2, false); -TREX_RPC_CMD_DEFINE(TrexRpcCmdTestSub, "test_sub", 2, false); +TREX_RPC_CMD_DEFINE(TrexRpcCmdTestAdd, "test_add", 2, false, APIClass::API_CLASS_TYPE_NO_API); +TREX_RPC_CMD_DEFINE(TrexRpcCmdTestSub, "test_sub", 2, false, APIClass::API_CLASS_TYPE_NO_API); + +/** + * api_sync command always present and valid and also ping.... + */ +TREX_RPC_CMD_DEFINE(TrexRpcCmdAPISync, "api_sync", 1, false, APIClass::API_CLASS_TYPE_NO_API); +TREX_RPC_CMD_DEFINE(TrexRpcCmdPing, "ping", 0, false, APIClass::API_CLASS_TYPE_NO_API); /** * general cmds */ -TREX_RPC_CMD_DEFINE(TrexRpcCmdPing, "ping", 0, false); -TREX_RPC_CMD_DEFINE(TrexRpcPublishNow, "publish_now", 2, false); -TREX_RPC_CMD_DEFINE(TrexRpcCmdGetCmds, "get_supported_cmds", 0, false); -TREX_RPC_CMD_DEFINE(TrexRpcCmdGetVersion, "get_version", 0, false); -TREX_RPC_CMD_DEFINE(TrexRpcCmdGetActivePGIds, "get_active_pgids",0, false); -TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdGetSysInfo, "get_system_info", 0, false, +TREX_RPC_CMD_DEFINE(TrexRpcPublishNow, "publish_now", 2, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetCmds, "get_supported_cmds", 0, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetVersion, "get_version", 0, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetActivePGIds, "get_active_pgids", 0, false, APIClass::API_CLASS_TYPE_CORE); + +TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdGetSysInfo, "get_system_info", 0, false, APIClass::API_CLASS_TYPE_CORE, std::string get_cpu_model(); void get_hostname(std::string &hostname); @@ -72,25 +78,25 @@ void get_hostname(std::string &hostname); /** * ownership */ -TREX_RPC_CMD_DEFINE(TrexRpcCmdGetOwner, "get_owner", 1, false); -TREX_RPC_CMD_DEFINE(TrexRpcCmdAcquire, "acquire", 4, false); -TREX_RPC_CMD_DEFINE(TrexRpcCmdRelease, "release", 1, true); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetOwner, "get_owner", 1, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdAcquire, "acquire", 4, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdRelease, "release", 1, true, APIClass::API_CLASS_TYPE_CORE); /** * port commands */ -TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStats, "get_port_stats", 1, false); -TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStatus, "get_port_status", 1, false); -TREX_RPC_CMD_DEFINE(TrexRpcCmdSetPortAttr, "set_port_attr", 3, false); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStats, "get_port_stats", 1, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStatus, "get_port_status", 1, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdSetPortAttr, "set_port_attr", 3, false, APIClass::API_CLASS_TYPE_CORE); /** * stream cmds */ -TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveAllStreams, "remove_all_streams", 1, true); -TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveStream, "remove_stream", 2, true); +TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveAllStreams, "remove_all_streams", 1, true, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveStream, "remove_stream", 2, true, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdAddStream, "add_stream", 3, true, +TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdAddStream, "add_stream", 3, true, APIClass::API_CLASS_TYPE_CORE, /* extended part */ std::unique_ptr<TrexStream> allocate_new_stream(const Json::Value §ion, uint8_t port_id, uint32_t stream_id, Json::Value &result); @@ -107,20 +113,22 @@ void parse_vm_instr_write_mask_flow_var(const Json::Value &inst, std::unique_ptr ); -TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStreamList, "get_stream_list", 1, false); -TREX_RPC_CMD_DEFINE(TrexRpcCmdGetAllStreams, "get_all_streams", 1, false); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStreamList, "get_stream_list", 1, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetAllStreams, "get_all_streams", 1, false, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 3, false); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 3, false, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 4, true); -TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic, "stop_traffic", 1, true); -TREX_RPC_CMD_DEFINE(TrexRpcCmdPauseTraffic, "pause_traffic", 1, true); -TREX_RPC_CMD_DEFINE(TrexRpcCmdResumeTraffic, "resume_traffic", 1, true); +TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 4, true, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic, "stop_traffic", 1, true, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveRXFilters, "remove_rx_filters", 1, true, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdPauseTraffic, "pause_traffic", 1, true, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdResumeTraffic, "resume_traffic", 1, true, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE(TrexRpcCmdUpdateTraffic, "update_traffic", 3, true); +TREX_RPC_CMD_DEFINE(TrexRpcCmdUpdateTraffic, "update_traffic", 3, true, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE(TrexRpcCmdValidate, "validate", 2, false); +TREX_RPC_CMD_DEFINE(TrexRpcCmdValidate, "validate", 2, false, APIClass::API_CLASS_TYPE_CORE); #endif /* __TREX_RPC_CMD_H__ */ + diff --git a/src/rpc-server/trex_rpc_cmd.cpp b/src/rpc-server/trex_rpc_cmd.cpp index caf161e3..902e63c7 100644 --- a/src/rpc-server/trex_rpc_cmd.cpp +++ b/src/rpc-server/trex_rpc_cmd.cpp @@ -23,6 +23,32 @@ limitations under the License. #include <trex_stateless.h> #include <trex_stateless_port.h> +/** + * method name and params + * + */ +TrexRpcCommand::TrexRpcCommand(const std::string &method_name, + int param_count, + bool needs_ownership, + APIClass::type_e type) : m_name(method_name), + m_param_count(param_count), + m_needs_ownership(needs_ownership) { + + /* if needs ownership - another field is needed (handler) */ + if (m_needs_ownership) { + m_param_count++; + } + + /* API verification */ + m_api_type = type; + + if (type != APIClass::API_CLASS_TYPE_NO_API) { + m_api_handler = get_stateless_obj()->get_api_handler(type); + m_param_count++; + } + +} + trex_rpc_cmd_rc_e TrexRpcCommand::run(const Json::Value ¶ms, Json::Value &result) { trex_rpc_cmd_rc_e rc; @@ -30,12 +56,18 @@ TrexRpcCommand::run(const Json::Value ¶ms, Json::Value &result) { /* the internal run can throw a parser error / other error */ try { - check_param_count(params, m_param_count, result); + /* verify API handler is correct (version mismatch) */ + if ( (m_api_type != APIClass::API_CLASS_TYPE_NO_API) && !g_test_override_api ) { + verify_api_handler(params, result); + } + /* verify ownership */ if (m_needs_ownership && !g_test_override_ownership) { verify_ownership(params, result); } + check_param_count(params, m_param_count, result); + /* run the command itself*/ rc = _run(params, result); @@ -72,6 +104,17 @@ TrexRpcCommand::verify_ownership(const Json::Value ¶ms, Json::Value &result) } } +void +TrexRpcCommand::verify_api_handler(const Json::Value ¶ms, Json::Value &result) { + std::string api_handler = parse_string(params, "api_h", result); + + if (m_api_handler != api_handler) { + std::stringstream ss; + ss << "API verification failed - API handler provided mismatch for class: '" << APIClass::type_to_name(m_api_type) << "'"; + generate_execute_err(result, ss.str()); + } +} + uint8_t TrexRpcCommand::parse_port(const Json::Value ¶ms, Json::Value &result) { uint8_t port_id = parse_byte(params, "port_id", result); @@ -281,3 +324,4 @@ TrexRpcCommand::generate_execute_err(Json::Value &result, const std::string &msg * by default this is off */ bool TrexRpcCommand::g_test_override_ownership = false; +bool TrexRpcCommand::g_test_override_api = false; diff --git a/src/rpc-server/trex_rpc_cmd_api.h b/src/rpc-server/trex_rpc_cmd_api.h index 7e694768..25920c6c 100644 --- a/src/rpc-server/trex_rpc_cmd_api.h +++ b/src/rpc-server/trex_rpc_cmd_api.h @@ -27,6 +27,8 @@ limitations under the License. #include <json/json.h> #include <trex_rpc_exception_api.h> +#include "trex_api_class.h" + /** * describe different types of rc for run() */ @@ -68,16 +70,10 @@ public: /** * method name and params */ - TrexRpcCommand(const std::string &method_name, int param_count, bool needs_ownership) : - m_name(method_name), - m_param_count(param_count), - m_needs_ownership(needs_ownership) { - - /* if needs ownership - another field is needed (handler) */ - if (m_needs_ownership) { - m_param_count++; - } - } + TrexRpcCommand(const std::string &method_name, + int param_count, + bool needs_ownership, + APIClass::type_e type); /** * entry point for executing RPC command @@ -99,6 +95,10 @@ public: g_test_override_ownership = enable; } + static void test_set_override_api(bool enable) { + g_test_override_api = enable; + } + virtual ~TrexRpcCommand() {} protected: @@ -131,11 +131,18 @@ protected: void check_param_count(const Json::Value ¶ms, int expected, Json::Value &result); /** + * verify API handler + * + */ + void verify_api_handler(const Json::Value ¶ms, Json::Value &result); + + /** * verify ownership * */ void verify_ownership(const Json::Value ¶ms, Json::Value &result); + /** * validate port id * @@ -360,11 +367,13 @@ protected: const char * json_type_to_name(const Json::Value &value); /* RPC command name */ - std::string m_name; - int m_param_count; - bool m_needs_ownership; - - static bool g_test_override_ownership; + std::string m_name; + int m_param_count; + bool m_needs_ownership; + std::string m_api_handler; + APIClass::type_e m_api_type; + static bool g_test_override_ownership; + static bool g_test_override_api; }; #endif /* __TREX_RPC_CMD_API_H__ */ diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index e1bd3eee..924503f2 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -33,6 +33,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { /* general */ + register_command(new TrexRpcCmdAPISync()); register_command(new TrexRpcCmdPing()); register_command(new TrexRpcPublishNow()); register_command(new TrexRpcCmdGetCmds()); @@ -61,6 +62,8 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { register_command(new TrexRpcCmdResumeTraffic()); register_command(new TrexRpcCmdUpdateTraffic()); + register_command(new TrexRpcCmdRemoveRXFilters()); + register_command(new TrexRpcCmdValidate()); } diff --git a/src/rpc-server/trex_rpc_exception_api.h b/src/rpc-server/trex_rpc_exception_api.h index e349b980..ebc9b411 100644 --- a/src/rpc-server/trex_rpc_exception_api.h +++ b/src/rpc-server/trex_rpc_exception_api.h @@ -25,17 +25,19 @@ limitations under the License. #include <string> #include <stdexcept> +#include "trex_exception.h" + /** * generic exception for RPC errors * */ -class TrexRpcException : public std::runtime_error -{ +class TrexRpcException : public TrexException { + public: - TrexRpcException() : std::runtime_error("") { + TrexRpcException() : TrexException("") { } - TrexRpcException(const std::string &what) : std::runtime_error(what) { + TrexRpcException(const std::string &what) : TrexException(what) { } }; diff --git a/src/sim/trex_sim_stateless.cpp b/src/sim/trex_sim_stateless.cpp index ffe377f4..fa13401d 100644 --- a/src/sim/trex_sim_stateless.cpp +++ b/src/sim/trex_sim_stateless.cpp @@ -117,6 +117,7 @@ SimStateless::SimStateless() { /* override ownership checks */ TrexRpcCommand::test_set_override_ownership(true); + TrexRpcCommand::test_set_override_api(true); } diff --git a/src/stateless/cp/trex_api_class.h b/src/stateless/cp/trex_api_class.h new file mode 100644 index 00000000..78933d23 --- /dev/null +++ b/src/stateless/cp/trex_api_class.h @@ -0,0 +1,110 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_API_CLASS_H__ +#define __TREX_API_CLASS_H__ + +#include <assert.h> + +#include "common/basic_utils.h" +#include "trex_exception.h" + +/** + * API exception + * + * @author imarom (03-Apr-16) + */ +class TrexAPIException : public TrexException { +public: + TrexAPIException(const std::string &what) : TrexException(what) { + } +}; + +/** + * define an API class + * + * @author imarom (03-Apr-16) + */ +class APIClass { +public: + + enum type_e { + API_CLASS_TYPE_CORE = 0, + API_CLASS_TYPE_MAX, + + API_CLASS_TYPE_NO_API + }; + + static const char * type_to_name(type_e type) { + switch (type) { + case API_CLASS_TYPE_CORE: + return "core"; + default: + assert(0); + } + } + + APIClass() { + /* invalid */ + m_type = API_CLASS_TYPE_MAX; + } + + void init(type_e type, int major, int minor) { + m_type = type; + m_major = major; + m_minor = minor; + + unsigned int seed = time(NULL); + m_handler = utl_generate_random_str(seed, 8); + } + + std::string & verify_api(int major, int minor) { + std::stringstream ss; + ss << "API type '" << type_to_name(m_type) << "': "; + + assert(m_type < API_CLASS_TYPE_MAX); + + /* for now a simple major check */ + if (major < m_major) { + ss << "server has a major newer API version - server: '" << m_major << "', client: '" << major << "'"; + throw TrexAPIException(ss.str()); + } + + if (major > m_major) { + ss << "server has an older API version - server: '" << m_major << "', client: '" << major << "'"; + throw TrexAPIException(ss.str()); + } + + return get_api_handler(); + } + + std::string & get_api_handler() { + return m_handler; + } + +private: + type_e m_type; + int m_major; + int m_minor; + std::string m_handler; + +}; + +#endif /* __TREX_API_CLASS_H__ */ diff --git a/src/stateless/cp/trex_exception.h b/src/stateless/cp/trex_exception.h new file mode 100644 index 00000000..b9e20761 --- /dev/null +++ b/src/stateless/cp/trex_exception.h @@ -0,0 +1,41 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_EXCEPTION_H__ +#define __TREX_EXCEPTION_H__ + +#include <stdexcept> +#include <string> + +/** + * generic exception for errors + * TODO: move this to a better place + */ +class TrexException : public std::runtime_error +{ +public: + TrexException() : std::runtime_error("") { + + } + TrexException(const std::string &what) : std::runtime_error(what) { + } +}; + +#endif /* __TREX_EXCEPTION_H__ */ diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 9df57a50..f6f81b96 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -53,6 +53,8 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { m_platform_api = cfg.m_platform_api; m_publisher = cfg.m_publisher; + /* API core version */ + m_api_classes[APIClass::API_CLASS_TYPE_CORE].init(APIClass::API_CLASS_TYPE_CORE, 1, 0); } /** @@ -175,3 +177,4 @@ TrexStateless::generate_publish_snapshot(std::string &snapshot) { snapshot = writer.write(root); } + diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 6e5e0c44..b506da61 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -27,27 +27,18 @@ limitations under the License. #include <mutex> -#include <trex_stream.h> -#include <trex_stateless_port.h> -#include <trex_rpc_server_api.h> -#include <publisher/trex_publisher.h> +#include "trex_stream.h" +#include "trex_stateless_port.h" +#include "trex_rpc_server_api.h" -#include <flow_stat.h> -#include <internal_api/trex_platform_api.h> +#include "publisher/trex_publisher.h" +#include "internal_api/trex_platform_api.h" -/** - * generic exception for errors - * TODO: move this to a better place - */ -class TrexException : public std::runtime_error -{ -public: - TrexException() : std::runtime_error("") { +#include "flow_stat.h" - } - TrexException(const std::string &what) : std::runtime_error(what) { - } -}; + +#include "trex_exception.h" +#include "trex_api_class.h" class TrexStatelessPort; @@ -81,6 +72,7 @@ public: } m_stats; }; + /** * config object for stateless object * @@ -167,6 +159,14 @@ public: return m_rpc_server; } + const std::string & verify_api(APIClass::type_e type, int major, int minor) { + return m_api_classes[type].verify_api(major, minor); + } + + const std::string & get_api_handler(APIClass::type_e type) { + return m_api_classes[type].get_api_handler(); + } + CFlowStatRuleMgr m_rx_flow_stat; protected: @@ -187,6 +187,8 @@ protected: TrexPublisher *m_publisher; + /* API */ + APIClass m_api_classes[APIClass::API_CLASS_TYPE_MAX]; }; /** @@ -197,6 +199,7 @@ protected: * @return TrexStateless& */ TrexStateless * get_stateless_obj(); +CRxCoreStateless * get_rx_sl_core_obj(); #endif /* __TREX_STATELESS_H__ */ diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 90589d7a..2239f3f6 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -272,6 +272,22 @@ TrexStatelessPort::stop_traffic(void) { } /** + * remove all RX filters from port + * + * @author imarom (28-Mar-16) + */ +void +TrexStatelessPort::remove_rx_filters(void) { + /* only valid when IDLE or with streams and not TXing */ + verify_state(PORT_STATE_STREAMS); + + for (auto entry : m_stream_table) { + get_stateless_obj()->m_rx_flow_stat.stop_stream(entry.second); + } + +} + +/** * when a port stops, perform various actions * */ @@ -287,9 +303,6 @@ TrexStatelessPort::common_port_stop_actions(bool async) { get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data); } - for (auto entry : m_stream_table) { - get_stateless_obj()->m_rx_flow_stat.stop_stream(entry.second); - } } void @@ -768,26 +781,5 @@ TrexPortOwner::TrexPortOwner() { m_seed = time(NULL); } -/** - * generate a random connection handler - * - */ -std::string -TrexPortOwner::generate_handler() { - std::stringstream ss; - - static const char alphanum[] = - "0123456789" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz"; - - /* generate 8 bytes of random handler */ - for (int i = 0; i < 8; ++i) { - ss << alphanum[rand_r(&m_seed) % (sizeof(alphanum) - 1)]; - } - - return (ss.str()); -} - const std::string TrexPortOwner::g_unowned_name = "<FREE>"; const std::string TrexPortOwner::g_unowned_handler = ""; diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 7e1838d4..2167e735 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -21,6 +21,7 @@ limitations under the License. #ifndef __TREX_STATELESS_PORT_H__ #define __TREX_STATELESS_PORT_H__ +#include "common/basic_utils.h" #include "internal_api/trex_platform_api.h" #include "trex_dp_port_events.h" #include "trex_stream.h" @@ -65,7 +66,7 @@ public: m_owner_name = owner_name; /* internal data */ - m_handler = generate_handler(); + m_handler = utl_generate_random_str(m_seed, 8); m_is_free = false; } @@ -83,7 +84,6 @@ public: private: - std::string generate_handler(); /* is this port owned by someone ? */ bool m_is_free; @@ -178,6 +178,14 @@ public: void stop_traffic(void); /** + * remove all RX filters + * valid only when port is stopped + * + * @author imarom (28-Mar-16) + */ + void remove_rx_filters(void); + + /** * pause traffic * throws TrexException in case of an error */ diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp index 9c7898a8..e3f0ba7c 100644 --- a/src/stateless/cp/trex_stream.cpp +++ b/src/stateless/cp/trex_stream.cpp @@ -106,6 +106,15 @@ void TrexStream::Dump(FILE *fd){ } } + if (m_rx_check.m_enabled) { + fprintf(fd, " Flow stat enabled:\n"); + fprintf(fd, " seq check %s latency check %s packet group id %d hw_id %d\n" + , m_rx_check.m_seq_enabled ? "enabled":"disabled" + , m_rx_check.m_latency ? "enabled":"disabled", m_rx_check.m_pg_id, m_rx_check.m_hw_id + ); + } else { + fprintf(fd, " Flow stat disabled\n"); + } fprintf(fd," rate :\n\n"); fprintf(fd," pps : %f\n", m_rate.get_pps()); diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp index 563236c2..d6971d68 100644 --- a/src/stateless/cp/trex_streams_compiler.cpp +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -477,8 +477,10 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream, TrexStream *fixed_rx_flow_stat_stream = stream->clone(true); - // not checking for errors. We assume that if add_stream succeeded, start_stream will too. - get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id); + get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream); + // CFlowStatRuleMgr keeps state of the stream object. We duplicated the stream here (in order not + // change the packet kept in the stream). We want the state to be saved in the original stream. + get_stateless_obj()->m_rx_flow_stat.copy_state(fixed_rx_flow_stat_stream, stream); /* can this stream be split to many cores ? */ if (!stream->is_splitable(dp_core_count)) { diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index ba25f61d..f125a46a 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -399,6 +399,7 @@ TrexStatelessDpCore::idle_state_loop() { int counter = 0; while (m_state == STATE_IDLE) { + m_core->m_node_gen.m_v_if->flush_dp_rx_queue(); bool had_msg = periodic_check_for_cp_messages(); if (had_msg) { counter = 0; diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index ab7c08d1..26f537f8 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -2,6 +2,7 @@ #include "bp_sim.h" #include "flow_stat_parser.h" #include "latency.h" +#include "pal/linux/sanb_atomic.h" #include "trex_stateless_messaging.h" #include "trex_stateless_rx_core.h" @@ -59,6 +60,8 @@ void CRxCoreStateless::idle_state_loop() { if (had_msg) { counter = 0; continue; + } else { + flush_rx(); } /* enter deep sleep only if enough time had passed */ @@ -72,8 +75,8 @@ void CRxCoreStateless::idle_state_loop() { } void CRxCoreStateless::start() { - static int count = 0; - static int i = 0; + int count = 0; + int i = 0; bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false; while (true) { @@ -91,7 +94,11 @@ void CRxCoreStateless::start() { } else { if (m_state == STATE_QUIT) break; + count = 0; + i = 0; + set_working_msg_ack(false); idle_state_loop(); + set_working_msg_ack(true); } if (do_try_rx_queue) { try_rx_queues(); @@ -101,7 +108,7 @@ void CRxCoreStateless::start() { } void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPort *lp, rte_mbuf_t *m) { - Cxl710Parser parser; + CFlowStatParser parser; if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) { uint16_t ip_id; @@ -162,6 +169,30 @@ void CRxCoreStateless::try_rx_queues() { } } +// exactly the same as try_rx, without the handle_rx_pkt +// purpose is to flush rx queues when core is in idle state +void CRxCoreStateless::flush_rx() { + rte_mbuf_t * rx_pkts[64]; + int i, total_pkts = 0; + for (i = 0; i < m_max_ports; i++) { + CLatencyManagerPerPort * lp = &m_ports[i]; + rte_mbuf_t * m; + m_cpu_dp_u.start_work(); + /* try to read 64 packets clean up the queue */ + uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64); + total_pkts += cnt_p; + if (cnt_p) { + int j; + for (j = 0; j < cnt_p; j++) { + m = rx_pkts[j]; + rte_pktmbuf_free(m); + } + /* commit only if there was work to do ! */ + m_cpu_dp_u.commit(); + }/* if work */ + }// all ports +} + int CRxCoreStateless::try_rx() { rte_mbuf_t * rx_pkts[64]; int i, total_pkts = 0; @@ -211,6 +242,12 @@ int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int return 0; } +void CRxCoreStateless::set_working_msg_ack(bool val) { + sanb_smp_memory_barrier(); + m_ack_start_work_msg = val; + sanb_smp_memory_barrier(); +} + double CRxCoreStateless::get_cpu_util() { m_cpu_cp_u.Update(); return m_cpu_cp_u.GetVal(); diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 5ab12f4e..b78256c2 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -54,6 +54,8 @@ class CRxCoreStateless { void work() {m_state = STATE_WORKING;} void idle() {m_state = STATE_IDLE;} void quit() {m_state = STATE_QUIT;} + bool is_working() const {return (m_ack_start_work_msg == true);} + void set_working_msg_ack(bool val); double get_cpu_util(); private: @@ -62,6 +64,7 @@ class CRxCoreStateless { void idle_state_loop(); void handle_rx_pkt(CLatencyManagerPerPort * lp, rte_mbuf_t * m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); + void flush_rx(); int try_rx(); void try_rx_queues(); bool is_flow_stat_id(uint16_t id); @@ -71,10 +74,13 @@ class CRxCoreStateless { uint32_t m_max_ports; bool m_has_streams; CLatencyManagerPerPort m_ports[TREX_MAX_PORTS]; - state_e m_state; /* state of all ports */ + state_e m_state; CNodeRing *m_ring_from_cp; CNodeRing *m_ring_to_cp; CCpuUtlDp m_cpu_dp_u; CCpuUtlCp m_cpu_cp_u; + // Used for acking "work" (go out of idle) messages from cp + volatile bool m_ack_start_work_msg __rte_cache_aligned; + }; #endif |