summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/bp_sim.h411
-rwxr-xr-xsrc/common/Network/Packet/EthernetHeader.h3
-rwxr-xr-xsrc/common/basic_utils.cpp23
-rwxr-xr-xsrc/common/basic_utils.h4
-rw-r--r--src/flow_stat.cpp312
-rw-r--r--src/flow_stat.h21
-rw-r--r--src/flow_stat_parser.cpp67
-rw-r--r--src/flow_stat_parser.h36
-rw-r--r--src/gtest/trex_stateless_gtest.cpp2
-rw-r--r--src/internal_api/trex_platform_api.h5
-rw-r--r--src/latency.h1
-rwxr-xr-xsrc/main.cpp4
-rw-r--r--src/main_dpdk.cpp69
-rw-r--r--src/main_dpdk.h1
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp44
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_stream.cpp27
-rw-r--r--src/rpc-server/commands/trex_rpc_cmds.h76
-rw-r--r--src/rpc-server/trex_rpc_cmd.cpp46
-rw-r--r--src/rpc-server/trex_rpc_cmd_api.h39
-rw-r--r--src/rpc-server/trex_rpc_cmds_table.cpp3
-rw-r--r--src/rpc-server/trex_rpc_exception_api.h10
-rw-r--r--src/sim/trex_sim_stateless.cpp1
-rw-r--r--src/stateless/cp/trex_api_class.h110
-rw-r--r--src/stateless/cp/trex_exception.h41
-rw-r--r--src/stateless/cp/trex_stateless.cpp3
-rw-r--r--src/stateless/cp/trex_stateless.h39
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp40
-rw-r--r--src/stateless/cp/trex_stateless_port.h12
-rw-r--r--src/stateless/cp/trex_stream.cpp9
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp6
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp1
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp43
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h8
33 files changed, 1061 insertions, 456 deletions
diff --git a/src/bp_sim.h b/src/bp_sim.h
index 4b1a88e3..cd85e82b 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -41,7 +41,7 @@ limitations under the License.
#include <common/Network/Packet/IPv6Header.h>
#include <common/Network/Packet/EthernetHeader.h>
#include <math.h>
-#include <common/bitMan.h>
+#include <common/bitMan.h>
#include <yaml-cpp/yaml.h>
#include "trex_defs.h"
#include "os_time.h"
@@ -97,7 +97,7 @@ public:
MIN_VM_V6=1 // IPv6 addressing
};
uint8_t m_cmd;
- uint8_t m_flags;
+ uint8_t m_flags;
uint16_t m_start_0;
uint16_t m_stop_1;
uint16_t m_add_pkt_len; /* request more length for mbuf packet the size */
@@ -116,16 +116,16 @@ public:
uint16_t m_server_port;
};
-/* this command replace IP in 2 diffrent location and port
+/* this command replace IP in 2 diffrent location and port
-c = 10.1.1.2
-o = 10.1.1.2
+c = 10.1.1.2
+o = 10.1.1.2
m = audio 102000
==>
-c = xx.xx.xx.xx
-o = xx.xx.xx.xx
+c = xx.xx.xx.xx
+o = xx.xx.xx.xx
m = audio yyyy
*/
@@ -248,7 +248,7 @@ class CFlowGenListPerThread ;
/* callback */
void on_node_first(uint8_t plugin_id,CGenNode * node,
- CFlowYamlInfo * template_info,
+ CFlowYamlInfo * template_info,
CTupleTemplateGeneratorSmart * tuple_gen,
CFlowGenListPerThread * flow_gen
);
@@ -259,7 +259,7 @@ rte_mbuf_t * on_node_generate_mbuf(uint8_t plugin_id,CGenNode * node,CFlowPk
class CPreviewMode ;
struct CGenNode;
-/* represent the virtual interface
+/* represent the virtual interface
*/
/* counters per side */
@@ -276,7 +276,7 @@ public:
uint64_t m_tx_drop;
uint64_t m_tx_queue_full;
uint64_t m_tx_alloc_error;
- tx_per_flow_t m_tx_per_flow[MAX_FLOW_STATS];
+ tx_per_flow_t m_tx_per_flow[MAX_FLOW_STATS];
CPerTxthreadTemplateInfo m_template;
public:
@@ -309,10 +309,10 @@ public:
void CVirtualIFPerSideStats::Dump(FILE *fd){
#define DP_B(f) if (f) printf(" %-40s : %lu \n",#f,f)
- DP_B(m_tx_pkt);
+ DP_B(m_tx_pkt);
DP_B(m_tx_rx_check_pkt);
- DP_B(m_tx_bytes);
- DP_B(m_tx_drop);
+ DP_B(m_tx_bytes);
+ DP_B(m_tx_drop);
DP_B(m_tx_alloc_error);
DP_B(m_tx_queue_full);
m_template.Dump(fd);
@@ -342,17 +342,17 @@ public:
/**
* send one packet
- *
+ *
* @param node
- *
- * @return
+ *
+ * @return
*/
virtual int send_node(CGenNode * node) =0;
/**
* send one packet to a specific dir. flush all packets
- *
+ *
* @param dir
* @param m
*/
@@ -361,26 +361,29 @@ public:
/**
- * flush all pending packets into the stream
- *
- * @return
+ * flush all pending packets into the stream
+ *
+ * @return
*/
virtual int flush_tx_queue(void)=0;
-
+ // read all packets from rx_queue on dp core
+ virtual void flush_dp_rx_queue(void) {};
+ // read all packets from rx queue
+ virtual void flush_rx_queue(void) {};
/**
* update the source and destination mac-addr of a given mbuf by global database
- *
+ *
* @param dir
* @param m
- *
- * @return
+ *
+ * @return
*/
virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, uint8_t * p)=0;
/**
* translate a port_id to the correct dir on the core
- *
+ *
*/
virtual pkt_dir_t port_id_to_dir(uint8_t port_id) {
return (CS_INVALID);
@@ -602,13 +605,13 @@ public:
}
}
- bool get_is_rx_check_enable(){
- return (btGetMaskBit32(m_flags,31,31) ? true:false);
- }
+ bool get_is_rx_check_enable(){
+ return (btGetMaskBit32(m_flags,31,31) ? true:false);
+ }
- void set_rx_check_enable(bool enable){
- btSetMaskBit32(m_flags,31,31,enable?1:0);
- }
+ void set_rx_check_enable(bool enable){
+ btSetMaskBit32(m_flags,31,31,enable?1:0);
+ }
bool get_mac_ip_features_enable(){
@@ -693,7 +696,7 @@ public:
u.m_mac.dest[3]=1;
u.m_mac.src[3]=1;
}
- union {
+ union {
mac_align_t m_mac;
uint8_t m_data[16];
} u;
@@ -717,10 +720,10 @@ public:
};
enum trex_learn_mode_e {
- LEARN_MODE_DISABLED=0,
- LEARN_MODE_TCP_ACK=1,
- LEARN_MODE_IP_OPTION=2,
- LEARN_MODE_MAX=LEARN_MODE_IP_OPTION
+ LEARN_MODE_DISABLED=0,
+ LEARN_MODE_TCP_ACK=1,
+ LEARN_MODE_IP_OPTION=2,
+ LEARN_MODE_MAX=LEARN_MODE_IP_OPTION
};
public:
@@ -736,7 +739,7 @@ public:
m_expected_portd = 4; /* should be at least the number of ports found in the system but could be less */
m_vlan_port[0]=100;
m_vlan_port[1]=100;
- m_rx_check_sample=0;
+ m_rx_check_sample=0;
m_rx_check_hops = 0;
m_io_mode=1;
m_run_flags=0;
@@ -759,12 +762,12 @@ public:
uint32_t m_latency_rate; /* pkt/sec for each thread/port zero disable */
uint32_t m_latency_mask;
uint32_t m_latency_prev;
- uint16_t m_rx_check_sample; /* the sample rate of flows */
+ uint16_t m_rx_check_sample; /* the sample rate of flows */
uint16_t m_rx_check_hops;
uint16_t m_zmq_port;
uint16_t m_telnet_port;
uint16_t m_expected_portd;
- uint16_t m_io_mode; //0,1,2 0 disable, 1- normal , 2 - short
+ uint16_t m_io_mode; //0,1,2 0 disable, 1- normal , 2 - short
uint16_t m_run_flags;
uint8_t m_mac_splitter;
uint8_t m_l_pkt_mode;
@@ -782,7 +785,7 @@ public:
std::string out_file;
std::string prefix;
-
+
CMacAddrCfg m_mac_addr[TREX_MAX_PORTS];
uint8_t * get_src_mac_addr(int if_index){
@@ -861,7 +864,7 @@ public:
void Dump(FILE *fd);
public:
- uint32_t m_mbuf[MBUF_SIZE]; // relative to traffic norm to 2x10G ports
+ uint32_t m_mbuf[MBUF_SIZE]; // relative to traffic norm to 2x10G ports
uint32_t m_num_cores;
};
@@ -869,28 +872,28 @@ public:
typedef uint8_t socket_id_t;
typedef uint8_t port_id_t;
/* the real phsical thread id */
-typedef uint8_t physical_thread_id_t;
+typedef uint8_t physical_thread_id_t;
-typedef uint8_t virtual_thread_id_t;
-/*
+typedef uint8_t virtual_thread_id_t;
+/*
+
+ virtual thread 0 (v0)- is always the master
- virtual thread 0 (v0)- is always the master
-
-for 2 dual ports ( 2x2 =4 ports) the virtual thread looks like that
+for 2 dual ports ( 2x2 =4 ports) the virtual thread looks like that
-----------------
DEFAULT:
-----------------
(0,1) (2,3)
dual-if0 dual-if-1
v1 v2
- v3 v4
+ v3 v4
v5 v6
- v7 v8
-
- rx is v9
+ v7 v8
+
+ rx is v9
- */
+ */
#define MAX_SOCKETS_SUPPORTED (4)
#define MAX_THREADS_SUPPORTED (120)
@@ -904,12 +907,12 @@ public:
/* is socket enabled */
virtual bool is_sockets_enable(socket_id_t socket)=0;
-
+
/* number of main active sockets. socket #0 is always used */
virtual socket_id_t max_num_active_sockets()=0;
virtual ~CPlatformSocketInfoBase() {}
-
+
public:
/* which socket to allocate memory to each port */
virtual socket_id_t port_to_socket(port_id_t port)=0;
@@ -949,7 +952,7 @@ public:
/* is socket enabled */
bool is_sockets_enable(socket_id_t socket);
-
+
/* number of main active sockets. socket #0 is always used */
socket_id_t max_num_active_sockets();
@@ -995,7 +998,7 @@ public:
/* is socket enabled */
bool is_sockets_enable(socket_id_t socket);
-
+
/* number of main active sockets. socket #0 is always used */
socket_id_t max_num_active_sockets();
@@ -1033,7 +1036,7 @@ private:
bool m_sockets_enable[MAX_SOCKETS_SUPPORTED];
uint32_t m_sockets_enabled;
socket_id_t m_socket_per_dual_if[(TREX_MAX_PORTS >> 1)];
-
+
uint32_t m_max_threads_per_dual_if;
uint32_t m_num_dual_if;
@@ -1058,7 +1061,7 @@ public:
/* is socket enabled */
bool is_sockets_enable(socket_id_t socket);
-
+
/* number of main active sockets. socket #0 is always used */
socket_id_t max_num_active_sockets();
@@ -1141,15 +1144,15 @@ public:
public:
rte_mempool_t * m_small_mbuf_pool; /* pool for start packets */
- rte_mempool_t * m_mbuf_pool_128;
- rte_mempool_t * m_mbuf_pool_256;
- rte_mempool_t * m_mbuf_pool_512;
- rte_mempool_t * m_mbuf_pool_1024;
- rte_mempool_t * m_mbuf_pool_2048;
- rte_mempool_t * m_mbuf_pool_4096;
- rte_mempool_t * m_mbuf_pool_9k;
+ rte_mempool_t * m_mbuf_pool_128;
+ rte_mempool_t * m_mbuf_pool_256;
+ rte_mempool_t * m_mbuf_pool_512;
+ rte_mempool_t * m_mbuf_pool_1024;
+ rte_mempool_t * m_mbuf_pool_2048;
+ rte_mempool_t * m_mbuf_pool_4096;
+ rte_mempool_t * m_mbuf_pool_9k;
- rte_mempool_t * m_mbuf_global_nodes;
+ rte_mempool_t * m_mbuf_global_nodes;
uint32_t m_pool_id;
};
@@ -1167,16 +1170,16 @@ public:
return ( m_mem_pool[socket].pktmbuf_alloc_small() );
}
-
+
/**
- * try to allocate small buffers too
- * _alloc allocate big buffers only
- *
+ * try to allocate small buffers too
+ * _alloc allocate big buffers only
+ *
* @param socket
* @param size
- *
- * @return
+ *
+ * @return
*/
static inline rte_mbuf_t * pktmbuf_alloc(socket_id_t socket,uint16_t size){
if (size<FIRST_PKT_SIZE) {
@@ -1232,7 +1235,7 @@ public:
public:
static CRteMemPool m_mem_pool[MAX_SOCKETS_SUPPORTED];
- static uint32_t m_nodes_pool_size;
+ static uint32_t m_nodes_pool_size;
static CParserOption m_options;
static CGlobalMemory m_memory_cfg;
static CPlatformSocketInfo m_socket;
@@ -1320,19 +1323,19 @@ struct CFlowYamlInfo {
m_server_pool_idx = 0;
m_cap_mode=false;
}
-
+
std::string m_name;
std::string m_client_pool_name;
std::string m_server_pool_name;
- double m_k_cps; //k CPS
- double m_restart_time; /* restart time of this template */
- dsec_t m_ipg_sec; // ipg in sec
+ double m_k_cps; //k CPS
+ double m_restart_time; /* restart time of this template */
+ dsec_t m_ipg_sec; // ipg in sec
dsec_t m_rtt_sec; // rtt in sec
- uint32_t m_w;
+ uint32_t m_w;
uint32_t m_wlength;
uint32_t m_limit;
uint32_t m_flowcnt;
- uint8_t m_plugin_id; /* 0 - default , 1 - RTSP160 , 2- RTSP250 */
+ uint8_t m_plugin_id; /* 0 - default , 1 - RTSP160 , 2- RTSP250 */
uint8_t m_client_pool_idx;
uint8_t m_server_pool_idx;
bool m_one_app_server;
@@ -1418,7 +1421,7 @@ public:
NODE_FLAGS_LEARN_MSG_PROCESSED =0x10, /* got NAT msg */
NODE_FLAGS_LATENCY =0x20, /* got NAT msg */
- NODE_FLAGS_INIT_START_FROM_SERVER_SIDE = 0x40,
+ NODE_FLAGS_INIT_START_FROM_SERVER_SIDE = 0x40,
NODE_FLAGS_ALL_FLOW_SAME_PORT_SIDE = 0x80,
NODE_FLAGS_INIT_START_FROM_SERVER_SIDE_SERVER_ADDR = 0x100 /* init packet start from server side with server addr */
};
@@ -1434,8 +1437,8 @@ public:
uint16_t m_src_port;
uint16_t m_flags; /* BIT 0 - DIR ,
- BIT 1 - mbug_cache
- BIT 2 - SAMPLE DUPLICATE */
+ BIT 1 - mbug_cache
+ BIT 2 - SAMPLE DUPLICATE */
double m_time; /* can't change this header - size 16 bytes*/
@@ -1512,7 +1515,7 @@ public:
/* is it possible to cache MBUF */
inline void update_next_pkt_in_flow(void);
- inline void reset_pkt_in_flow(void);
+ inline void reset_pkt_in_flow(void);
inline uint8_t get_plugin_id(void){
return ( m_template_info->m_plugin_id);
}
@@ -1567,8 +1570,8 @@ public:
/* direction for TCP/UDP port */
inline pkt_dir_t cur_pkt_port_addr_dir();
/* from which interface dir to get out */
- inline pkt_dir_t cur_interface_dir();
-
+ inline pkt_dir_t cur_interface_dir();
+
inline void set_mbuf_cache_dir(pkt_dir_t dir){
if (dir) {
@@ -1597,19 +1600,19 @@ public:
public:
- inline void set_rx_check(){
- m_flags |= NODE_FLAGS_SAMPLE_RX_CHECK;
- }
+ inline void set_rx_check(){
+ m_flags |= NODE_FLAGS_SAMPLE_RX_CHECK;
+ }
- inline bool is_rx_check_enabled(){
- return ((m_flags & NODE_FLAGS_SAMPLE_RX_CHECK)?true:false);
- }
+ inline bool is_rx_check_enabled(){
+ return ((m_flags & NODE_FLAGS_SAMPLE_RX_CHECK)?true:false);
+ }
public:
inline void set_nat_first_state(){
btSetMaskBit16(m_flags,4,3,1);
- m_type=FLOW_PKT_NAT;
+ m_type=FLOW_PKT_NAT;
}
inline bool is_nat_first_state(){
@@ -1665,8 +1668,8 @@ public:
bool is_external_is_eq_to_internal_ip(){
/* this API is used to check TRex itself */
- if ( (get_nat_ipv4_addr() == m_src_ip ) &&
- (get_nat_ipv4_port()==m_src_port) &&
+ if ( (get_nat_ipv4_addr() == m_src_ip ) &&
+ (get_nat_ipv4_port()==m_src_port) &&
( get_nat_ipv4_addr_server() == m_dest_ip) ) {
return (true);
}else{
@@ -1704,7 +1707,7 @@ struct CGenNodeDeferPort {
uint16_t m_ports[DEFER_CLIENTS_NUM];
uint8_t m_pool_idx[DEFER_CLIENTS_NUM];
public:
- void init(void){
+ void init(void){
m_type=CGenNode::FLOW_DEFER_PORT_RELEASE;
m_cnt=0;
}
@@ -1724,7 +1727,7 @@ public:
} __rte_cache_aligned ;
-/* run time verification of objects size and offsets
+/* run time verification of objects size and offsets
need to clean this up and derive this objects from base object but require too much refactoring right now
hhaim
*/
@@ -1817,19 +1820,19 @@ public:
/**
* send one packet
- *
+ *
* @param node
- *
- * @return
+ *
+ * @return
*/
virtual int send_node(CGenNode * node);
/**
- * flush all pending packets into the stream
- *
- * @return
+ * flush all pending packets into the stream
+ *
+ * @return
*/
virtual int flush_tx_queue(void);
@@ -1858,7 +1861,7 @@ public:
/**
* same as regular STL but no I/O (dry run)
- *
+ *
* @author imarom (07-Jan-16)
*/
class CErfIFStlNull : public CErfIFStl {
@@ -1959,7 +1962,7 @@ public:
int open_file(std::string file_name,
CPreviewMode * preview);
int close_file(CFlowGenListPerThread * thread);
- int flush_file(dsec_t max_time,
+ int flush_file(dsec_t max_time,
dsec_t d_time,
bool always,
CFlowGenListPerThread * thread,
@@ -2012,7 +2015,7 @@ public:
CPreviewMode m_preview_mode;
uint64_t m_cnt;
uint64_t m_limit;
- CTimeHistogram m_realtime_his;
+ CTimeHistogram m_realtime_his;
};
@@ -2128,7 +2131,7 @@ inline bool CFlowKey::operator ==(const CFlowKey& rhs) const{
#define IS_PCAP_TIMING 7
-// 8-12 is used
+// 8-12 is used
#define FLOW_ID 8
@@ -2164,9 +2167,9 @@ public:
}
private:
- // per direction info
+ // per direction info
uint16_t m_dir_pkt_num; // pkt id
- uint16_t m_max_dir_flow_pkts;
+ uint16_t m_max_dir_flow_pkts;
};
@@ -2219,7 +2222,7 @@ public:
}
/**
* start from zero 0,1,2,.. , it is on global flow if you have couple of flows it will count all of the flows
- *
+ *
* flow FlowPktNum
* 0 0
* 0 1
@@ -2227,8 +2230,8 @@ public:
* 1 0
* 1 1
* 2 0
- *
- * @return
+ *
+ * @return
*/
inline uint32_t getFlowPktNum(){
return ( m_flow_pkt_num);
@@ -2252,7 +2255,7 @@ public:
}
- /* return true if this packet in diff direction from prev flow packet ,
+ /* return true if this packet in diff direction from prev flow packet ,
if true need to choose RTT else IPG for inter packet gap */
inline bool IsRtt(){
return (btGetMaskBit32(m_flags,IS_RTT,IS_RTT) ? true:false);
@@ -2313,7 +2316,7 @@ public:
inline void SetId(uint16_t _id){
btSetMaskBit32(m_flags,31,16,_id);
-
+
}
inline uint16_t getId(){
return ( ( uint16_t)btGetMaskBit32(m_flags,31,16));
@@ -2328,7 +2331,7 @@ public:
return (btGetMaskBit32(m_flags,IS_LAST_PKT_S,IS_LAST_PKT_E) ? true:false);
}
- // there could be couple of flows per template in case of plugin
+ // there could be couple of flows per template in case of plugin
inline void SetMaxPktsPerFlow(uint32_t pkts){
assert(pkts<65000);
m_max_flow_pkts=pkts;
@@ -2336,7 +2339,7 @@ public:
inline uint16_t GetMaxPktsPerFlow(){
return ( m_max_flow_pkts );
}
- // there could be couple of flows per template in case of plugin
+ // there could be couple of flows per template in case of plugin
inline void SetMaxFlowTimeout(double sec){
//assert (sec<65000);
sec = sec*2.0+5.0;
@@ -2369,12 +2372,12 @@ public:
private:
uint32_t m_flags;
- uint16_t m_flow_pkt_num; // packet number inside the flow
- uint8_t m_plugin_id; // packet number inside the flow
+ uint16_t m_flow_pkt_num; // packet number inside the flow
+ uint8_t m_plugin_id; // packet number inside the flow
uint8_t m_pad;
uint16_t m_max_flow_pkts; // how many packet per this flow getFlowId()
- uint16_t m_max_flow_aging; // maximum aging in sec
- CPacketDescriptorPerDir m_per_dir[CS_NUM]; // per direction info
+ uint16_t m_max_flow_aging; // maximum aging in sec
+ CPacketDescriptorPerDir m_per_dir[CS_NUM]; // per direction info
};
@@ -2427,7 +2430,7 @@ public:
class CPacketIndication {
public:
- dsec_t m_cap_ipg; /* ipg from cap file */
+ dsec_t m_cap_ipg; /* ipg from cap file */
CCapPktRaw * m_packet;
CFlow * m_flow;
@@ -2437,10 +2440,10 @@ public:
IPv6Header * m_ipv6;
} l3;
bool m_is_ipv6;
- union {
+ union {
TCPHeader * m_tcp;
UDPHeader * m_udp;
- ICMPHeader * m_icmp;
+ ICMPHeader * m_icmp;
} l4;
uint8_t * m_payload;
uint16_t m_payload_len;
@@ -2489,10 +2492,10 @@ public:
/**
- * return the application ipv4/ipv6 option offset
+ * return the application ipv4/ipv6 option offset
* if learn bit is ON , it is always the first options ( IPV6/IPV4)
- *
- * @return
+ *
+ * @return
*/
uint32_t getIpAppOptionOffset(){
if ( is_ipv6() ) {
@@ -2585,7 +2588,7 @@ class CPacketParser {
public:
bool Create();
void Delete();
- bool ProcessPacket(CPacketIndication * pkt_indication,
+ bool ProcessPacket(CPacketIndication * pkt_indication,
CCapPktRaw * raw_packet);
public:
CCPacketParserCounters m_counter;
@@ -2706,12 +2709,12 @@ public:
inline rte_mbuf_t * do_generate_new_mbuf_ex(CGenNode * node,CFlowInfo * flow_info);
inline rte_mbuf_t * do_generate_new_mbuf_ex_big(CGenNode * node,CFlowInfo * flow_info);
inline rte_mbuf_t * do_generate_new_mbuf_ex_vm(CGenNode * node,
- CFlowInfo * flow_info, int16_t * s_size);
+ CFlowInfo * flow_info, int16_t * s_size);
public:
- /* push the number of bytes into the packets and make more room
- should be used by NAT feature that should have ipv4 option in the first packet
- this function should not be called in runtime, only when template is loaded due to it heavey cost of operation ( malloc/free memory )
+ /* push the number of bytes into the packets and make more room
+ should be used by NAT feature that should have ipv4 option in the first packet
+ this function should not be called in runtime, only when template is loaded due to it heavey cost of operation ( malloc/free memory )
*/
char * push_ipv4_option_offline(uint8_t bytes);
char * push_ipv6_option_offline(uint8_t bytes);
@@ -2720,10 +2723,10 @@ public:
/**
* mark this packet as learn packet
- * should
+ * should
* 1. push ipv4 option ( 8 bytes)
- * 2. mask the packet as learn
- * 3. update the option pointer
+ * 2. mask the packet as learn
+ * 3. update the option pointer
*/
void mask_as_learn();
@@ -2750,7 +2753,7 @@ private:
public:
CPacketIndication m_pkt_indication;
- CCapPktRaw * m_packet;
+ CCapPktRaw * m_packet;
rte_mbuf_t * m_big_mbuf[MAX_SOCKETS_SUPPORTED]; /* allocate big mbug per socket */
};
@@ -2764,10 +2767,10 @@ inline void CFlowPktInfo::update_pkt_info2(char *p,
int update_len ,
CGenNode * node
){
- IPHeader * ipv4=
+ IPHeader * ipv4=
(IPHeader *)(p + m_pkt_indication.getFastIpOffsetFast());
- EthernetHeader * et =
+ EthernetHeader * et =
(EthernetHeader * )(p + m_pkt_indication.getFastEtherOffset());
(void)et;
@@ -2820,7 +2823,7 @@ inline void CFlowPktInfo::update_pkt_info2(char *p,
m_tcp->setSourcePort(flow_info->server_port);
}
}
-
+
}else {
if ( m_pkt_indication.m_desc.IsUdp() ){
UDPHeader * m_udp =(UDPHeader *)(p +m_pkt_indication.getFastTcpOffset() );
@@ -2848,10 +2851,10 @@ inline void CFlowPktInfo::update_pkt_info2(char *p,
inline void CFlowPktInfo::update_pkt_info(char *p,
CGenNode * node){
- IPHeader * ipv4=
+ IPHeader * ipv4=
(IPHeader *)(p + m_pkt_indication.getFastIpOffsetFast());
- EthernetHeader * et =
+ EthernetHeader * et =
(EthernetHeader * )(p + m_pkt_indication.getFastEtherOffset());
(void)et;
@@ -2863,7 +2866,7 @@ inline void CFlowPktInfo::update_pkt_info(char *p,
if ( unlikely (m_pkt_indication.is_ipv6())) {
-
+
// Update the IPv6 address
IPv6Header *ipv6= (IPv6Header *)ipv4;
@@ -2886,22 +2889,22 @@ inline void CFlowPktInfo::update_pkt_info(char *p,
ipv4->setTimeToLive(TTL_RESERVE_DUPLICATE);
/* first ipv4 option add the info in case of learn packet, usualy only the first packet */
- if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_IP_OPTION)) {
- CNatOption *lpNat =(CNatOption *)ipv4->getOption();
- lpNat->set_fid(node->get_short_fid());
- lpNat->set_thread_id(node->get_thread_id());
- } else {
- // This method only work on first TCP SYN
- if (ipv4->getProtocol() == IPPROTO_TCP) {
- TCPHeader *tcp = (TCPHeader *)(((uint8_t *)ipv4) + ipv4->getHeaderLength());
- if (tcp->getSynFlag()) {
- tcp->setAckNumber(CNatRxManager::calc_tcp_ack_val(node->get_short_fid(), node->get_thread_id()));
- }
+ if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_IP_OPTION)) {
+ CNatOption *lpNat =(CNatOption *)ipv4->getOption();
+ lpNat->set_fid(node->get_short_fid());
+ lpNat->set_thread_id(node->get_thread_id());
+ } else {
+ // This method only work on first TCP SYN
+ if (ipv4->getProtocol() == IPPROTO_TCP) {
+ TCPHeader *tcp = (TCPHeader *)(((uint8_t *)ipv4) + ipv4->getHeaderLength());
+ if (tcp->getSynFlag()) {
+ tcp->setAckNumber(CNatRxManager::calc_tcp_ack_val(node->get_short_fid(), node->get_thread_id()));
+ }
#ifdef NAT_TRACE_
- printf(" %.3f : flow_id: %x thread_id %x TCP ack %x\n",now_sec(), node->get_short_fid(), node->get_thread_id(), tcp->getAckNumber());
+ printf(" %.3f : flow_id: %x thread_id %x TCP ack %x\n",now_sec(), node->get_short_fid(), node->get_thread_id(), tcp->getAckNumber());
#endif
- }
- }
+ }
+ }
}
/* in all cases update the ip using the outside ip */
@@ -3005,7 +3008,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex(CGenNode * node,
BP_ASSERT ( (((uintptr_t)m_packet->raw) & 0x7f )== 0) ;
- memcpy(p,m_packet->raw,len);
+ memcpy(p,m_packet->raw,len);
update_pkt_info2(p,flow_info,0,node);
@@ -3014,7 +3017,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex(CGenNode * node,
return(m);
}
-
+
inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex_big(CGenNode * node,
CFlowInfo * flow_info){
rte_mbuf_t * m;
@@ -3029,7 +3032,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex_big(CGenNode * node,
BP_ASSERT ( (((uintptr_t)m_packet->raw) & 0x7f )== 0) ;
- memcpy(p,m_packet->raw,len);
+ memcpy(p,m_packet->raw,len);
update_pkt_info2(p,flow_info,0,node);
@@ -3055,7 +3058,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex_vm(CGenNode * node,
/* alloc big buffer to update it*/
m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), len);
- assert(m);
+ assert(m);
/* append the additional bytes requested and update later */
char *p=rte_pktmbuf_append(m, len);
@@ -3063,7 +3066,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex_vm(CGenNode * node,
BP_ASSERT ( (((uintptr_t)m_packet->raw) & 0x7f )== 0) ;
/* copy the headers until the payload */
- memcpy(p, m_packet->raw, m_pkt_indication.getPayloadOffset() );
+ memcpy(p, m_packet->raw, m_pkt_indication.getPayloadOffset() );
CMiniVM vm;
vm.m_pkt_info = this;
vm.m_pyload_mbuf_ptr = p+m_pkt_indication.getPayloadOffset();
@@ -3077,7 +3080,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_ex_vm(CGenNode * node,
/* update IP length , and TCP checksum , we can accelerate this using hardware ! */
uint16_t pkt_adjust = vm.m_new_pkt_size - m_packet->pkt_len;
update_pkt_info2(p,flow_info,pkt_adjust,node);
-
+
/* return change in packet size due to packet tranforms */
*s_size = vm.m_new_pkt_size - m_packet->pkt_len;
@@ -3110,7 +3113,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf(CGenNode * node){
BP_ASSERT ( (((uintptr_t)m_packet->raw) & 0x7f )== 0) ;
- memcpy(p,m_packet->raw,len);
+ memcpy(p,m_packet->raw,len);
update_pkt_info(p,node);
@@ -3119,7 +3122,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf(CGenNode * node){
return m;
}
-
+
inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_big(CGenNode * node){
rte_mbuf_t * m;
uint16_t len = m_packet->pkt_len;
@@ -3133,7 +3136,7 @@ inline rte_mbuf_t * CFlowPktInfo::do_generate_new_mbuf_big(CGenNode * node){
BP_ASSERT ( (((uintptr_t)m_packet->raw) & 0x7f )== 0) ;
- memcpy(p,m_packet->raw,len);
+ memcpy(p,m_packet->raw,len);
update_pkt_info(p,node);
@@ -3206,15 +3209,15 @@ public:
class CCapFileFlowInfo {
public:
enum load_cap_file_err {
- kOK = 0,
- kFileNotExist,
- kNegTimestamp,
- kNoSyn,
- kTCPOffsetTooBig,
- kNoTCPFromServer,
- kPktNotSupp,
- kPktProcessFail,
- kCapFileErr
+ kOK = 0,
+ kFileNotExist,
+ kNegTimestamp,
+ kNoSyn,
+ kTCPOffsetTooBig,
+ kNoTCPFromServer,
+ kPktNotSupp,
+ kPktProcessFail,
+ kCapFileErr
};
bool Create();
@@ -3253,7 +3256,7 @@ public:
return (m_total_errors);
}
- // return the cap file length in sec
+ // return the cap file length in sec
double get_cap_file_length_sec();
void get_total_memory(CCCapFileMemoryUsage & memory);
@@ -3287,8 +3290,8 @@ public:
// IPv4 addressing
// IPv6 addressing
- std::vector <uint16_t> m_src_ipv6;
- std::vector <uint16_t> m_dst_ipv6;
+ std::vector <uint16_t> m_src_ipv6;
+ std::vector <uint16_t> m_dst_ipv6;
bool m_ipv6_set;
// new section
@@ -3342,7 +3345,7 @@ public:
double duration_sec;
double m_cps;
double m_mb_sec;
- double m_mB_sec;
+ double m_mB_sec;
double m_c_flows;
double m_pps ;
double m_total_Mbytes ;
@@ -3367,7 +3370,7 @@ public:
class CFlowGeneratorRecPerThread {
public:
- bool Create(CTupleGeneratorSmart * global_gen,
+ bool Create(CTupleGeneratorSmart * global_gen,
CFlowYamlInfo * info,
CFlowsYamlInfo * yaml_flow_info,
CCapFileFlowInfo * flow_info,
@@ -3388,11 +3391,11 @@ public:
CCapFileFlowInfo * m_flow_info;
CFlowYamlInfo * m_info;
CFlowsYamlInfo * m_flows_info;
- CPolicer m_policer;
+ CPolicer m_policer;
uint16_t m_id ;
uint32_t m_thread_id;
bool m_tuple_gen_was_set;
-} __rte_cache_aligned;
+} __rte_cache_aligned;
@@ -3405,16 +3408,16 @@ public:
uint16_t _id);
void Delete();
public:
-
+
void Dump(FILE *fd);
void getFlowStats(CFlowStats * stats);
public:
CCapFileFlowInfo m_flow_info;
CFlowYamlInfo * m_info;
CFlowsYamlInfo * m_flows_info;
- CPolicer m_policer;
+ CPolicer m_policer;
uint16_t m_id;
-private:
+private:
void fixup_ipg_if_needed();
};
@@ -3423,7 +3426,7 @@ public:
CPPSMeasure(){
reset();
}
- //reset
+ //reset
void reset(void){
m_start=false;
m_last_time_msec=0;
@@ -3453,7 +3456,7 @@ public:
class CBwMeasure {
public:
CBwMeasure();
- //reset
+ //reset
void reset(void);
//add packet size
double add(uint64_t size);
@@ -3498,7 +3501,7 @@ public:
friend class CNodeGenerator;
friend class CPluginCallbackSimple;
friend class CCapFileFlowInfo;
-
+
typedef CGenericMap<flow_id_t,CGenNode> flow_id_node_t;
bool Create(uint32_t thread_id,
@@ -3518,23 +3521,23 @@ public:
m_node_gen.set_vif(v_if);
}
- /* return the dual port ID this thread is attached to in 4 ports configuration
- there are 2 dual-ports
+ /* return the dual port ID this thread is attached to in 4 ports configuration
+ there are 2 dual-ports
thread 0 - dual 0
thread 1 - dual 1
thread 2 - dual 0
thread 3 - dual 1
-
- */
+
+ */
uint32_t getDualPortId();
public :
double get_total_kcps();
double get_total_kcps(uint8_t pool_idx, bool is_client);
double get_delta_flow_is_sec();
- double get_longest_flow();
- double get_longest_flow(uint8_t pool_idx, bool is_client);
+ double get_longest_flow();
+ double get_longest_flow(uint8_t pool_idx, bool is_client);
void inc_current_template(void);
int generate_flows_roundrobin(bool *done);
int reschedule_flow(CGenNode *node);
@@ -3627,9 +3630,9 @@ public:
CFlowGenList * m_flow_list;
rte_mempool_t * m_node_pool;
- std::vector<CFlowGeneratorRecPerThread *> m_cap_gen;
+ std::vector<CFlowGeneratorRecPerThread *> m_cap_gen;
- CFlowsYamlInfo m_yaml_info;
+ CFlowsYamlInfo m_yaml_info;
CTupleGeneratorSmart m_smart_gen;
@@ -3644,7 +3647,7 @@ public:
double m_stop_time_sec;
CPreviewMode m_preview_mode;
-public:
+public:
CFlowGenStats m_stats;
CBwMeasure m_mb_sec;
CCpuUtlDp m_cpu_dp_u;
@@ -3663,7 +3666,7 @@ private:
bool m_terminated_by_master;
private:
- uint8_t m_cacheline_pad[RTE_CACHE_LINE_SIZE][19]; // improve prefech
+ uint8_t m_cacheline_pad[RTE_CACHE_LINE_SIZE][19]; // improve prefech
} __rte_cache_aligned ;
inline CGenNode * CFlowGenListPerThread::create_node(void){
@@ -3726,7 +3729,7 @@ public:
public:
std::vector<CFlowGeneratorRec *> m_cap_gen; /* global info */
CFlowsYamlInfo m_yaml_info; /* global yaml*/
- std::vector<CFlowGenListPerThread *> m_threads_info;
+ std::vector<CFlowGenListPerThread *> m_threads_info;
CFlowGenListMac m_mac_info;
};
@@ -3761,19 +3764,19 @@ inline void CCapFileFlowInfo::generate_flow(CTupleTemplateGeneratorSmart * tup
node->m_flow_info = this;
node->m_flags=0;
node->m_template_info =template_info;
- node->m_tuple_gen = tuple_gen->get_gen();
+ node->m_tuple_gen = tuple_gen->get_gen();
node->m_src_ip= tuple.getClient();
node->m_dest_ip = tuple.getServer();
node->m_src_idx = tuple.getClientId();
node->m_dest_idx = tuple.getServerId();
node->m_src_port = tuple.getClientPort();
- memcpy(&node->m_src_mac,
- tuple.getClientMac(),
+ memcpy(&node->m_src_mac,
+ tuple.getClientMac(),
sizeof(mac_addr_align_t));
node->m_plugin_info =(void *)0;
if ( unlikely( CGlobalInfo::is_learn_mode() ) ){
- // check if flow is two direction
+ // check if flow is two direction
if ( lp->m_pkt_indication.m_desc.IsBiDirectionalFlow() ) {
/* we are in learn mode */
CFlowGenListPerThread * lpThread=gen->Parent();
@@ -3822,7 +3825,7 @@ inline void CFlowGeneratorRecPerThread::generate_flow(CNodeGenerator * gen,
uint64_t flow_id,
CGenNode * node){
- m_flow_info->generate_flow(&tuple_gen,
+ m_flow_info->generate_flow(&tuple_gen,
gen,
time,
flow_id,
@@ -3869,7 +3872,7 @@ inline void CGenNode::update_next_pkt_in_flow(void){
m_pkt_info = m_flow_info->GetPacket((pkt_index-1));
}
-inline void CGenNode::reset_pkt_in_flow(void){
+inline void CGenNode::reset_pkt_in_flow(void){
m_pkt_info = m_flow_info->GetPacket(0);
}
@@ -3898,7 +3901,7 @@ public:
class CPluginCallbackSimple : public CPluginCallback {
public:
virtual void on_node_first(uint8_t plugin_id,CGenNode * node,
- CFlowYamlInfo * template_info,
+ CFlowYamlInfo * template_info,
CTupleTemplateGeneratorSmart * tuple_gen,
CFlowGenListPerThread * flow_gen);
virtual void on_node_last(uint8_t plugin_id,CGenNode * node);
@@ -3950,7 +3953,7 @@ inline pkt_dir_t CGenNode::cur_interface_dir(){
return (is_eligible_from_server_side()?SERVER_SIDE:CLIENT_SIDE);
}else{
return ( is_init ?CLIENT_SIDE:SERVER_SIDE);
- }
+ }
}
diff --git a/src/common/Network/Packet/EthernetHeader.h b/src/common/Network/Packet/EthernetHeader.h
index 87d1ed91..c9dcdbe2 100755
--- a/src/common/Network/Packet/EthernetHeader.h
+++ b/src/common/Network/Packet/EthernetHeader.h
@@ -1,5 +1,5 @@
/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@ limitations under the License.
#include "PacketHeaderBase.h"
#include "MacAddress.h"
+#define ETH_HDR_LEN 14
/**
* This class encapsulates an ethernet header.
diff --git a/src/common/basic_utils.cpp b/src/common/basic_utils.cpp
index 34c37755..4f5578a6 100755
--- a/src/common/basic_utils.cpp
+++ b/src/common/basic_utils.cpp
@@ -17,6 +17,7 @@ limitations under the License.
#include <ctype.h>
#include <stdio.h>
#include <string>
+#include <sstream>
bool utl_is_file_exists (const std::string& name) {
if (FILE *file = fopen(name.c_str(), "r")) {
@@ -175,3 +176,25 @@ void utl_macaddr_to_str(const uint8_t *macaddr, std::string &output) {
}
}
+
+/**
+ * generate a random connection handler
+ *
+ */
+std::string
+utl_generate_random_str(unsigned int &seed, int len) {
+ std::stringstream ss;
+
+ static const char alphanum[] =
+ "0123456789"
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "abcdefghijklmnopqrstuvwxyz";
+
+ /* generate 8 bytes of random handler */
+ for (int i = 0; i < len; ++i) {
+ ss << alphanum[rand_r(&seed) % (sizeof(alphanum) - 1)];
+ }
+
+ return (ss.str());
+}
+
diff --git a/src/common/basic_utils.h b/src/common/basic_utils.h
index 77282eea..63e858ab 100755
--- a/src/common/basic_utils.h
+++ b/src/common/basic_utils.h
@@ -21,8 +21,6 @@ limitations under the License.
#include <stdio.h>
#include <string>
-
-
/**
* the round must be power 2 e.g 2,4,8...
*
@@ -87,6 +85,8 @@ bool utl_is_file_exists (const std::string& name) ;
void utl_macaddr_to_str(const uint8_t *macaddr, std::string &output);
+std::string utl_generate_random_str(unsigned int &seed, int len);
+
#endif
diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp
index 778c92b9..13f8eb16 100644
--- a/src/flow_stat.cpp
+++ b/src/flow_stat.cpp
@@ -18,6 +18,32 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
+
+/*
+Important classes in this file:
+CFlowStatUserIdInfo - Information about one packet group id
+CFlowStatUserIdMap - Mapping between packet group id (chosen by user) and hardware counter id
+CFlowStatHwIdMap - Mapping between hardware id and packet group id
+CFlowStatRuleMgr - API to users of the file
+
+General idea of operation:
+For each stream needing flow statistics, the user provides packet group id (pg_id). Few streams can have the same pg_id.
+We maintain reference count.
+When doing start_stream, for the first stream in pg_id, hw_id is associated with the pg_id, and relevant hardware rules are
+inserted (on supported hardware). When stopping all streams with the pg_id, the hw_id <--> pg_id mapping is removed, hw_id is
+returned to the free hw_id pool, and hardware rules are removed. Counters for the pg_id are kept.
+If starting streams again, new hw_id will be assigned, and counters will continue from where they stopped. Only When deleting
+all streams using certain pg_id, infromation about this pg_id will be freed.
+
+For each stream we keep state in the m_rx_check.m_hw_id field. Since we keep reference count for certain structs, we want to
+protect from illegal operations, like starting stream while it is already starting, stopping when it is stopped...
+State machine is:
+stream_init: HW_ID_INIT
+stream_add: HW_ID_FREE
+stream_start: legal hw_id (range is 0..MAX_FLOW_STATS)
+stream_stop: HW_ID_FREE
+stream_del: HW_ID_INIT
+ */
#include <sstream>
#include <string>
#include <iostream>
@@ -26,6 +52,7 @@
#include "internal_api/trex_platform_api.h"
#include "trex_stateless.h"
#include "trex_stateless_messaging.h"
+#include "trex_stateless_rx_core.h"
#include "trex_stream.h"
#include "flow_stat_parser.h"
#include "flow_stat.h"
@@ -33,8 +60,8 @@
#define FLOW_STAT_ADD_ALL_PORTS 255
-static const uint16_t FREE_HW_ID = UINT16_MAX;
-static bool no_stat_supported = true;
+static const uint16_t HW_ID_INIT = UINT16_MAX;
+static const uint16_t HW_ID_FREE = UINT16_MAX - 1;
inline std::string methodName(const std::string& prettyFunction)
{
@@ -48,6 +75,11 @@ inline std::string methodName(const std::string& prettyFunction)
#define __METHOD_NAME__ methodName(__PRETTY_FUNCTION__)
#ifdef __DEBUG_FUNC_ENTRY__
#define FUNC_ENTRY (std::cout << __METHOD_NAME__ << std::endl);
+#ifdef __STREAM_DUMP__
+#define stream_dump(stream) stream->Dump(stderr)
+#else
+#define stream_dump(stream)
+#endif
#else
#define FUNC_ENTRY
#endif
@@ -107,7 +139,7 @@ int CFlowStatUserIdInfo::add_stream(uint8_t proto) {
#endif
if (proto != m_proto)
- return -1;
+ throw TrexException("Can't use same pg_id for streams with different l4 protocol");
m_ref_count++;
@@ -147,7 +179,7 @@ uint16_t CFlowStatUserIdMap::get_hw_id(uint32_t user_id) {
CFlowStatUserIdInfo *cf = find_user_id(user_id);
if (cf == NULL) {
- return FREE_HW_ID;
+ return HW_ID_FREE;
} else {
return cf->get_hw_id();
}
@@ -198,7 +230,7 @@ int CFlowStatUserIdMap::add_stream(uint32_t user_id, uint8_t proto) {
if (! c_user_id) {
c_user_id = add_user_id(user_id, proto);
if (! c_user_id)
- return -1;
+ throw TrexException("Failed adding statistic counter - Failure in add_stream");
return 0;
} else {
return c_user_id->add_stream(proto);
@@ -214,11 +246,11 @@ int CFlowStatUserIdMap::del_stream(uint32_t user_id) {
c_user_id = find_user_id(user_id);
if (! c_user_id) {
- return -1;
+ throw TrexException("Trying to delete stream which does not exist");
}
if (c_user_id->del_stream() == 0) {
- // ref count of this port became 0. can release this entry.
+ // ref count of this entry became 0. can release this entry.
m_map.erase(user_id);
delete c_user_id;
}
@@ -237,13 +269,13 @@ int CFlowStatUserIdMap::start_stream(uint32_t user_id, uint16_t hw_id) {
if (! c_user_id) {
fprintf(stderr, "%s Error: Trying to associate hw id %d to user_id %d but it does not exist\n"
, __func__, hw_id, user_id);
- return -1;
+ throw TrexException("Internal error: Trying to associate non exist group id");
}
if (c_user_id->is_hw_id()) {
- fprintf(stderr, "%s Error: Trying to associate hw id %d to user_id %d but it is already associate to %u\n"
+ fprintf(stderr, "%s Error: Trying to associate hw id %d to user_id %d but it is already associated to %u\n"
, __func__, hw_id, user_id, c_user_id->get_hw_id());
- return -1;
+ throw TrexException("Internal error: Trying to associate used packet group id to different hardware counter");
}
c_user_id->set_hw_id(hw_id);
c_user_id->add_started_stream();
@@ -260,9 +292,9 @@ int CFlowStatUserIdMap::start_stream(uint32_t user_id) {
c_user_id = find_user_id(user_id);
if (! c_user_id) {
- fprintf(stderr, "%s Error: Trying to start stream on user_id %d but it does not exist\n"
+ fprintf(stderr, "%s Error: Trying to start stream on pg_id %d but it does not exist\n"
, __func__, user_id);
- return -1;
+ throw TrexException("Trying to start stream with non exist packet group id");
}
c_user_id->add_started_stream();
@@ -281,9 +313,9 @@ int CFlowStatUserIdMap::stop_stream(uint32_t user_id) {
c_user_id = find_user_id(user_id);
if (! c_user_id) {
- fprintf(stderr, "%s Error: Trying to stop stream on user_id %d but it does not exist\n"
+ fprintf(stderr, "%s Error: Trying to stop stream on pg_id %d but it does not exist\n"
, __func__, user_id);
- return -1;
+ throw TrexException("Trying to stop stream with non exist packet group id");
}
return c_user_id->stop_started_stream();
@@ -332,7 +364,7 @@ uint16_t CFlowStatUserIdMap::unmap(uint32_t user_id) {
CFlowStatHwIdMap::CFlowStatHwIdMap() {
m_num_free = MAX_FLOW_STATS;
for (int i = 0; i < MAX_FLOW_STATS; i++) {
- m_map[i] = FREE_HW_ID;
+ m_map[i] = HW_ID_FREE;
}
}
@@ -357,11 +389,11 @@ std::ostream& operator<<(std::ostream& os, const CFlowStatHwIdMap& cf) {
uint16_t CFlowStatHwIdMap::find_free_hw_id() {
for (int i = 0; i < MAX_FLOW_STATS; i++) {
- if (m_map[i] == FREE_HW_ID)
+ if (m_map[i] == HW_ID_FREE)
return i;
}
- return FREE_HW_ID;
+ return HW_ID_FREE;
}
void CFlowStatHwIdMap::map(uint16_t hw_id, uint32_t user_id) {
@@ -378,7 +410,7 @@ void CFlowStatHwIdMap::unmap(uint16_t hw_id) {
std::cout << __METHOD_NAME__ << " hw id:" << hw_id << std::endl;
#endif
- m_map[hw_id] = FREE_HW_ID;
+ m_map[hw_id] = HW_ID_FREE;
m_num_free++;
}
@@ -388,6 +420,34 @@ CFlowStatRuleMgr::CFlowStatRuleMgr() {
m_max_hw_id = -1;
m_num_started_streams = 0;
m_ring_to_rx = NULL;
+ m_capabilities = 0;
+ m_parser = NULL;
+ m_rx_core = NULL;
+}
+
+CFlowStatRuleMgr::~CFlowStatRuleMgr() {
+ if (m_parser)
+ delete m_parser;
+}
+
+void CFlowStatRuleMgr::create() {
+ uint16_t num_counters, capabilities;
+ TrexStateless *tstateless = get_stateless_obj();
+ assert(tstateless);
+
+ m_api = tstateless->get_platform_api();
+ assert(m_api);
+ m_api->get_interface_stat_info(0, num_counters, capabilities);
+ m_api->get_port_num(m_num_ports);
+ for (uint8_t port = 0; port < m_num_ports; port++) {
+ assert(m_api->reset_hw_flow_stats(port) == 0);
+ }
+ m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+ assert(m_ring_to_rx);
+ m_rx_core = get_rx_sl_core_obj();
+ m_parser = m_api->get_flow_stat_parser();
+ assert(m_parser);
+ m_capabilities = capabilities;
}
std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) {
@@ -397,110 +457,111 @@ std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) {
return os;
}
-int CFlowStatRuleMgr::compile_stream(const TrexStream * stream, Cxl710Parser &parser) {
+int CFlowStatRuleMgr::compile_stream(const TrexStream * stream, CFlowStatParser *parser) {
#ifdef __DEBUG_FUNC_ENTRY__
std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << " en:";
std::cout << stream->m_rx_check.m_enabled << std::endl;
#endif
- // currently we support only IP ID rule types
- // all our ports are the same type, so testing port 0 is enough
- uint16_t num_counters, capabilities;
- m_api->get_interface_stat_info(0, num_counters, capabilities);
- if ((capabilities & TrexPlatformApi::IF_STAT_IPV4_ID) == 0) {
- return -2;
- }
-
- if (parser.parse(stream->m_pkt.binary, stream->m_pkt.len) != 0) {
+ if (parser->parse(stream->m_pkt.binary, stream->m_pkt.len) != 0) {
// if we could not parse the packet, but no stat count needed, it is probably OK.
if (stream->m_rx_check.m_enabled) {
fprintf(stderr, "Error: %s - Compilation failed\n", __func__);
- return -1;
+ throw TrexException("Failed parsing given packet for flow stat. Probably bad packet format.");
} else {
return 0;
}
}
- if (!parser.is_fdir_supported()) {
+ if (!parser->is_stat_supported()) {
if (stream->m_stream_id <= 0) {
- // rx stat not needed. Do nothing.
+ // flow stat not needed. Do nothing.
return 0;
} else {
- // rx stat needed, but packet format is not supported
- fprintf(stderr, "Error: %s - Unsupported packet format for rx stat\n", __func__);
- return -1;
+ // flow stat needed, but packet format is not supported
+ fprintf(stderr, "Error: %s - Unsupported packet format for flow stat\n", __func__);
+ throw TrexException("Unsupported packet format for flow stat on given interface type");
}
}
return 0;
}
-int CFlowStatRuleMgr::add_stream(const TrexStream * stream) {
+void CFlowStatRuleMgr::copy_state(TrexStream * from, TrexStream * to) {
+ to->m_rx_check.m_hw_id = from->m_rx_check.m_hw_id;
+}
+void CFlowStatRuleMgr::init_stream(TrexStream * stream) {
+ stream->m_rx_check.m_hw_id = HW_ID_INIT;
+}
+
+int CFlowStatRuleMgr::add_stream(TrexStream * stream) {
#ifdef __DEBUG_FUNC_ENTRY__
std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
+ stream_dump(stream);
#endif
+ if (! stream->m_rx_check.m_enabled) {
+ return 0;
+ }
+
// Init everything here, and not in the constructor, since we relay on other objects
// By the time a stream is added everything else is initialized.
if (! m_api ) {
- TrexStateless *tstateless = get_stateless_obj();
- m_api = tstateless->get_platform_api();
- uint16_t num_counters, capabilities;
- m_api->get_interface_stat_info(0, num_counters, capabilities);
- if ((capabilities & TrexPlatformApi::IF_STAT_IPV4_ID) == 0) {
- // All our interfaces are from the same type. If statistics not supported.
- // no operation will work
- return -1;
- } else {
- no_stat_supported = false;
- }
- m_api->get_port_num(m_num_ports);
- for (uint8_t port = 0; port < m_num_ports; port++) {
- assert(m_api->reset_hw_flow_stats(port) == 0);
- }
- m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+ create();
}
- if (no_stat_supported)
- return -ENOTSUP;
+ //??? put back assert(stream->m_rx_check.m_hw_id == HW_ID_INIT);
- Cxl710Parser parser;
- int ret;
+ uint16_t rule_type = TrexPlatformApi::IF_STAT_IPV4_ID; // In the future need to get it from the stream;
- if (! stream->m_rx_check.m_enabled) {
- return 0;
+ if ((m_capabilities & rule_type) == 0) {
+ fprintf(stderr, "Error: %s - rule type not supported by interface\n", __func__);
+ throw TrexException("Interface does not support given rule type");
}
- if ((ret = compile_stream(stream, parser)) < 0)
- return ret;
+ // compile_stream throws exception if something goes wrong
+ compile_stream(stream, m_parser);
uint8_t l4_proto;
- if (parser.get_l4_proto(l4_proto) < 0) {
- printf("Error: %s failed finding l4 proto\n", __func__);
- return -1;
+ if (m_parser->get_l4_proto(l4_proto) < 0) {
+ fprintf(stderr, "Error: %s failed finding l4 proto\n", __func__);
+ throw TrexException("Failed determining l4 proto for packet");
}
- return m_user_id_map.add_stream(stream->m_rx_check.m_pg_id, l4_proto);
+ // throws exception if there is error
+ m_user_id_map.add_stream(stream->m_rx_check.m_pg_id, l4_proto);
+
+ stream->m_rx_check.m_hw_id = HW_ID_FREE;
+ return 0;
}
-int CFlowStatRuleMgr::del_stream(const TrexStream * stream) {
+int CFlowStatRuleMgr::del_stream(TrexStream * stream) {
#ifdef __DEBUG_FUNC_ENTRY__
std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
+ stream_dump(stream);
#endif
- if (no_stat_supported)
- return -ENOTSUP;
-
if (! stream->m_rx_check.m_enabled) {
return 0;
}
- if (m_user_id_map.is_started(stream->m_rx_check.m_pg_id)) {
- std::cerr << "Error: Trying to delete flow statistics stream " << stream->m_rx_check.m_pg_id
- << " which is not stopped." << std::endl;
- return -1;
+ if (! m_api)
+ throw TrexException("Called del_stream, but no stream was added");
+
+ // we got del_stream command for a stream which has valid hw_id.
+ // Probably someone forgot to call stop
+ if(stream->m_rx_check.m_hw_id < MAX_FLOW_STATS) {
+ stop_stream(stream);
+ }
+
+ // calling del for same stream twice, or for a stream which was never "added"
+ if(stream->m_rx_check.m_hw_id == HW_ID_INIT) {
+ return 0;
}
+ // Throws exception in case of error
+ m_user_id_map.del_stream(stream->m_rx_check.m_pg_id);
+ stream->m_rx_check.m_hw_id = HW_ID_INIT;
- return m_user_id_map.del_stream(stream->m_rx_check.m_pg_id);
+ return 0;
}
// called on all streams, when stream start to transmit
@@ -509,46 +570,73 @@ int CFlowStatRuleMgr::del_stream(const TrexStream * stream) {
// If stream does not need flow stat counting, make sure it does not interfere with
// other streams that do need stat counting.
// Might change the IP ID of the stream packet
-int CFlowStatRuleMgr::start_stream(TrexStream * stream, uint16_t &ret_hw_id) {
+int CFlowStatRuleMgr::start_stream(TrexStream * stream) {
#ifdef __DEBUG_FUNC_ENTRY__
std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
+ stream_dump(stream);
#endif
- Cxl710Parser parser;
int ret;
-
- if (no_stat_supported)
- return -ENOTSUP;
-
- if ((ret = compile_stream(stream, parser)) < 0)
- return ret;
+ // Streams which does not need statistics might be started, before any stream that do
+ // need statistcs, so start_stream might be called before add_stream
+ if (! m_api ) {
+ create();
+ }
// first handle streams that do not need rx stat
if (! stream->m_rx_check.m_enabled) {
- // no need for stat count
+ try {
+ compile_stream(stream, m_parser);
+ } catch (TrexException) {
+ // If no statistics needed, and we can't parse the stream, that's OK.
+ return 0;
+ }
+
uint16_t ip_id;
- if (parser.get_ip_id(ip_id) < 0) {
- return 0; // if we could not find and ip id, no need to fix
+ if (m_parser->get_ip_id(ip_id) < 0) {
+ return 0; // if we could not find the ip id, no need to fix
}
// verify no reserved IP_ID used, and change if needed
if (ip_id >= IP_ID_RESERVE_BASE) {
- if (parser.set_ip_id(ip_id & 0xefff) < 0) {
- return -1;
+ if (m_parser->set_ip_id(ip_id & 0xefff) < 0) {
+ throw TrexException("Stream IP ID in reserved range. Failed changing it");
}
}
return 0;
}
- uint16_t hw_id;
// from here, we know the stream need rx stat
+
+ // Starting a stream which was never added
+ if (stream->m_rx_check.m_hw_id == HW_ID_INIT) {
+ add_stream(stream);
+ }
+
+ if (stream->m_rx_check.m_hw_id < MAX_FLOW_STATS) {
+ throw TrexException("Starting a stream which was already started");
+ }
+
+ uint16_t rule_type = TrexPlatformApi::IF_STAT_IPV4_ID; // In the future, need to get it from the stream;
+
+ if ((m_capabilities & rule_type) == 0) {
+ fprintf(stderr, "Error: %s - rule type not supported by interface\n", __func__);
+ throw TrexException("Interface does not support given rule type");
+ }
+
+ // compile_stream throws exception if something goes wrong
+ if ((ret = compile_stream(stream, m_parser)) < 0)
+ return ret;
+
+ uint16_t hw_id;
+
if (m_user_id_map.is_started(stream->m_rx_check.m_pg_id)) {
m_user_id_map.start_stream(stream->m_rx_check.m_pg_id); // just increase ref count;
hw_id = m_user_id_map.get_hw_id(stream->m_rx_check.m_pg_id); // can't fail if we got here
} else {
hw_id = m_hw_id_map.find_free_hw_id();
- if (hw_id == FREE_HW_ID) {
+ if (hw_id == HW_ID_FREE) {
printf("Error: %s failed finding free hw_id\n", __func__);
- return -1;
+ throw TrexException("Failed allocating statistic counter. Probably all are used.");
} else {
if (hw_id > m_max_hw_id) {
m_max_hw_id = hw_id;
@@ -557,19 +645,43 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream, uint16_t &ret_hw_id) {
m_user_id_map.start_stream(user_id, hw_id);
m_hw_id_map.map(hw_id, user_id);
add_hw_rule(hw_id, m_user_id_map.l4_proto(user_id));
+ // clear hardware counters. Just in case we have garbage from previous iteration
+ rx_per_flow_t rx_counter;
+ tx_per_flow_t tx_counter;
+ for (uint8_t port = 0; port < m_num_ports; port++) {
+ m_api->get_flow_stats(port, &rx_counter, (void *)&tx_counter, hw_id, hw_id, true);
+ }
}
}
- parser.set_ip_id(IP_ID_RESERVE_BASE + hw_id);
+ m_parser->set_ip_id(IP_ID_RESERVE_BASE + hw_id);
- ret_hw_id = hw_id;
+ // saving given hw_id on stream for use by tx statistics count
+ stream->m_rx_check.m_hw_id = hw_id;
#ifdef __DEBUG_FUNC_ENTRY__
- std::cout << "exit:" << __METHOD_NAME__ << " hw_id:" << ret_hw_id << std::endl;
+ std::cout << "exit:" << __METHOD_NAME__ << " hw_id:" << hw_id << std::endl;
+ stream_dump(stream);
#endif
if (m_num_started_streams == 0) {
send_start_stop_msg_to_rx(true); // First transmitting stream. Rx core should start reading packets;
+
+ // wait to make sure that message is acknowledged. RX core might be in deep sleep mode, and we want to
+ // start transmitting packets only after it is working, otherwise, packets will get lost.
+ if (m_rx_core) { // in simulation, m_rx_core will be NULL
+ int count = 0;
+ while (!m_rx_core->is_working()) {
+ delay(1);
+ count++;
+ if (count == 100) {
+ throw TrexException("Critical error!! - RX core failed to start");
+ }
+ }
+ }
+ } else {
+ // make sure rx core is working. If not, we got really confused somehow.
+ assert(m_rx_core->is_working());
}
m_num_started_streams++;
return 0;
@@ -583,17 +695,25 @@ int CFlowStatRuleMgr::add_hw_rule(uint16_t hw_id, uint8_t proto) {
return 0;
}
-int CFlowStatRuleMgr::stop_stream(const TrexStream * stream) {
+int CFlowStatRuleMgr::stop_stream(TrexStream * stream) {
#ifdef __DEBUG_FUNC_ENTRY__
std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
+ stream_dump(stream);
#endif
- if (no_stat_supported)
- return -ENOTSUP;
-
if (! stream->m_rx_check.m_enabled) {
return 0;
}
+ if (! m_api)
+ throw TrexException("Called stop_stream, but no stream was added");
+
+ if (stream->m_rx_check.m_hw_id >= MAX_FLOW_STATS) {
+ // We allow stopping while already stopped. Will not hurt us.
+ return 0;
+ }
+
+ stream->m_rx_check.m_hw_id = HW_ID_FREE;
+
if (m_user_id_map.stop_stream(stream->m_rx_check.m_pg_id) == 0) {
// last stream associated with the entry stopped transmittig.
// remove user_id <--> hw_id mapping
@@ -601,7 +721,7 @@ int CFlowStatRuleMgr::stop_stream(const TrexStream * stream) {
uint16_t hw_id = m_user_id_map.get_hw_id(stream->m_rx_check.m_pg_id);
if (hw_id >= MAX_FLOW_STATS) {
fprintf(stderr, "Error: %s got wrong hw_id %d from unmap\n", __func__, hw_id);
- return -1;
+ throw TrexException("Internal error in stop_stream. Got bad hw_id");
} else {
// update counters, and reset before unmapping
CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(hw_id));
diff --git a/src/flow_stat.h b/src/flow_stat.h
index 83f076de..06b54d70 100644
--- a/src/flow_stat.h
+++ b/src/flow_stat.h
@@ -37,6 +37,8 @@
typedef std::map<uint32_t, uint16_t> flow_stat_map_t;
typedef std::map<uint32_t, uint16_t>::iterator flow_stat_map_it_t;
+class CRxCoreStateless;
+
class tx_per_flow_t_ {
public:
tx_per_flow_t_() {
@@ -104,7 +106,7 @@ typedef class tx_per_flow_t_ tx_per_flow_t;
typedef class tx_per_flow_t_ rx_per_flow_t;
class CPhyEthIF;
-class Cxl710Parser;
+class CFlowStatParser;
class CFlowStatUserIdInfo {
public:
@@ -198,16 +200,20 @@ class CFlowStatRuleMgr {
};
CFlowStatRuleMgr();
+ ~CFlowStatRuleMgr();
friend std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf);
- int add_stream(const TrexStream * stream);
- int del_stream(const TrexStream * stream);
- int start_stream(TrexStream * stream, uint16_t &ret_hw_id);
- int stop_stream(const TrexStream * stream);
+ void copy_state(TrexStream * from, TrexStream * to);
+ void init_stream(TrexStream * stream);
+ int add_stream(TrexStream * stream);
+ int del_stream(TrexStream * stream);
+ int start_stream(TrexStream * stream);
+ int stop_stream(TrexStream * stream);
int get_active_pgids(flow_stat_active_t &result);
bool dump_json(std::string & json, bool baseline);
private:
- int compile_stream(const TrexStream * stream, Cxl710Parser &parser);
+ void create();
+ int compile_stream(const TrexStream * stream, CFlowStatParser *parser);
int add_hw_rule(uint16_t hw_id, uint8_t proto);
void send_start_stop_msg_to_rx(bool is_start);
@@ -216,9 +222,12 @@ class CFlowStatRuleMgr {
CFlowStatUserIdMap m_user_id_map; // map user ids to hw ids
uint8_t m_num_ports; // How many ports are being used
const TrexPlatformApi *m_api;
+ const CRxCoreStateless *m_rx_core;
int m_max_hw_id; // max hw id we ever used
uint32_t m_num_started_streams; // How many started (transmitting) streams we have
CNodeRing *m_ring_to_rx; // handle for sending messages to Rx core
+ CFlowStatParser *m_parser;
+ uint16_t m_capabilities;
};
#endif
diff --git a/src/flow_stat_parser.cpp b/src/flow_stat_parser.cpp
index 52824f73..8cb41fb7 100644
--- a/src/flow_stat_parser.cpp
+++ b/src/flow_stat_parser.cpp
@@ -25,38 +25,42 @@
#include <common/Network/Packet/EthernetHeader.h>
#include <flow_stat_parser.h>
-Cxl710Parser::Cxl710Parser() {
- reset();
-}
-
-void Cxl710Parser::reset() {
+void CFlowStatParser::reset() {
m_ipv4 = 0;
m_l4_proto = 0;
- m_fdir_supported = false;
+ m_stat_supported = false;
}
-int Cxl710Parser::parse(uint8_t *p, uint16_t len) {
+int CFlowStatParser::parse(uint8_t *p, uint16_t len) {
EthernetHeader *ether = (EthernetHeader *)p;
+ int min_len = ETH_HDR_LEN + IPV4_HDR_LEN;
+ reset();
+
+ if (len < min_len)
+ return -1;
switch( ether->getNextProtocol() ) {
case EthernetHeader::Protocol::IP :
- m_ipv4 = (IPHeader *)(p + 14);
- m_fdir_supported = true;
+ m_ipv4 = (IPHeader *)(p + ETH_HDR_LEN);
+ m_stat_supported = true;
break;
case EthernetHeader::Protocol::VLAN :
+ min_len += 4;
+ if (len < min_len)
+ return -1;
switch ( ether->getVlanProtocol() ){
case EthernetHeader::Protocol::IP:
m_ipv4 = (IPHeader *)(p + 18);
- m_fdir_supported = true;
+ m_stat_supported = true;
break;
default:
- m_fdir_supported = false;
+ m_stat_supported = false;
return -1;
}
break;
default:
- m_fdir_supported = false;
+ m_stat_supported = false;
return -1;
break;
}
@@ -64,7 +68,7 @@ int Cxl710Parser::parse(uint8_t *p, uint16_t len) {
return 0;
}
-int Cxl710Parser::get_ip_id(uint16_t &ip_id) {
+int CFlowStatParser::get_ip_id(uint16_t &ip_id) {
if (! m_ipv4)
return -1;
@@ -73,18 +77,18 @@ int Cxl710Parser::get_ip_id(uint16_t &ip_id) {
return 0;
}
-int Cxl710Parser::set_ip_id(uint16_t new_id) {
+int CFlowStatParser::set_ip_id(uint16_t new_id) {
if (! m_ipv4)
return -1;
// Updating checksum, not recalculating, so if someone put bad checksum on purpose, it will stay bad
- m_ipv4->updateCheckSum(m_ipv4->getId(), PKT_NTOHS(new_id));
+ m_ipv4->updateCheckSum(PKT_NTOHS(m_ipv4->getId()), PKT_NTOHS(new_id));
m_ipv4->setId(new_id);
return 0;
}
-int Cxl710Parser::get_l4_proto(uint8_t &proto) {
+int CFlowStatParser::get_l4_proto(uint8_t &proto) {
if (! m_ipv4)
return -1;
@@ -96,7 +100,7 @@ int Cxl710Parser::get_l4_proto(uint8_t &proto) {
static const uint16_t TEST_IP_ID = 0xabcd;
static const uint8_t TEST_L4_PROTO = 0x11;
-int Cxl710Parser::test() {
+int CFlowStatParser::test() {
uint16_t ip_id = 0;
uint8_t l4_proto;
uint8_t test_pkt[] = {
@@ -107,7 +111,7 @@ int Cxl710Parser::test() {
0x0a, 0xbc, 0x08, 0x00, // vlan
// IP header
0x45,0x02,0x00,0x30,
- 0x00,0x00,0x40,0x00,
+ 0x01,0x02,0x40,0x00,
0xff, TEST_L4_PROTO, 0xbd,0x04,
0x10,0x0,0x0,0x1,
0x30,0x0,0x0,0x1,
@@ -124,14 +128,37 @@ int Cxl710Parser::test() {
assert(m_ipv4->isChecksumOK() == true);
assert(get_l4_proto(l4_proto) == 0);
assert(l4_proto == TEST_L4_PROTO);
- assert(m_fdir_supported == true);
+ assert(m_stat_supported == true);
reset();
// bad packet
test_pkt[16] = 0xaa;
assert (parse(test_pkt, sizeof(test_pkt)) == -1);
- assert(m_fdir_supported == false);
+ assert(m_stat_supported == false);
+
+ return 0;
+}
+
+// In 82599 10G card we do not support VLANs
+int C82599Parser::parse(uint8_t *p, uint16_t len) {
+ EthernetHeader *ether = (EthernetHeader *)p;
+ int min_len = ETH_HDR_LEN + IPV4_HDR_LEN;
+ reset();
+
+ if (len < min_len)
+ return -1;
+
+ switch( ether->getNextProtocol() ) {
+ case EthernetHeader::Protocol::IP :
+ m_ipv4 = (IPHeader *)(p + ETH_HDR_LEN);
+ m_stat_supported = true;
+ break;
+ default:
+ m_stat_supported = false;
+ return -1;
+ break;
+ }
return 0;
}
diff --git a/src/flow_stat_parser.h b/src/flow_stat_parser.h
index 606a1bec..8c9e1418 100644
--- a/src/flow_stat_parser.h
+++ b/src/flow_stat_parser.h
@@ -19,19 +19,33 @@
limitations under the License.
*/
-class Cxl710Parser {
+#ifndef __FLOW_STAT_PARSER_H__
+#define __FLOW_STAT_PARSER_H__
+
+// Basic flow stat parser. Relevant for xl710/x710/x350 cards
+#include "common/Network/Packet/IPHeader.h"
+
+class CFlowStatParser {
public:
- Cxl710Parser();
- void reset();
- int parse(uint8_t *pkt, uint16_t len);
- bool is_fdir_supported() {return m_fdir_supported == true;};
- int get_ip_id(uint16_t &ip_id);
- int set_ip_id(uint16_t ip_id);
- int get_l4_proto(uint8_t &proto);
- int test();
+ virtual ~CFlowStatParser() {};
+ virtual void reset();
+ virtual int parse(uint8_t *pkt, uint16_t len);
+ virtual bool is_stat_supported() {return m_stat_supported == true;};
+ virtual int get_ip_id(uint16_t &ip_id);
+ virtual int set_ip_id(uint16_t ip_id);
+ virtual int get_l4_proto(uint8_t &proto);
+ virtual int test();
- private:
+ protected:
IPHeader *m_ipv4;
- bool m_fdir_supported;
+ bool m_stat_supported;
uint8_t m_l4_proto;
};
+
+class C82599Parser : public CFlowStatParser {
+ public:
+ ~C82599Parser() {};
+ int parse(uint8_t *pkt, uint16_t len);
+};
+
+#endif
diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp
index c3dfcb95..a5cf3307 100644
--- a/src/gtest/trex_stateless_gtest.cpp
+++ b/src/gtest/trex_stateless_gtest.cpp
@@ -3581,7 +3581,7 @@ class rx_stat_pkt_parse : public testing::Test {
TEST_F(rx_stat_pkt_parse, x710_parser) {
- Cxl710Parser parser;
+ CFlowStatParser parser;
parser.test();
}
diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h
index dbca5a8a..90eaa7c7 100644
--- a/src/internal_api/trex_platform_api.h
+++ b/src/internal_api/trex_platform_api.h
@@ -26,6 +26,7 @@ limitations under the License.
#include <vector>
#include <string>
#include <string.h>
+#include "flow_stat_parser.h"
#include "trex_defs.h"
/**
@@ -34,6 +35,7 @@ limitations under the License.
* @author imarom (06-Oct-15)
*/
+
class TrexPlatformGlobalStats {
public:
TrexPlatformGlobalStats() {
@@ -151,6 +153,7 @@ public:
virtual bool get_promiscuous(uint8_t port_id) const = 0;
virtual void flush_dp_messages() const = 0;
virtual int get_active_pgids(flow_stat_active_t &result) const = 0;
+ virtual CFlowStatParser *get_flow_stat_parser() const = 0;
virtual ~TrexPlatformApi() {}
};
@@ -180,6 +183,7 @@ public:
bool get_promiscuous(uint8_t port_id) const;
void flush_dp_messages() const;
int get_active_pgids(flow_stat_active_t &result) const;
+ CFlowStatParser *get_flow_stat_parser() const;
};
@@ -241,6 +245,7 @@ public:
void flush_dp_messages() const {
}
int get_active_pgids(flow_stat_active_t &result) const {return 0;}
+ CFlowStatParser *get_flow_stat_parser() const {return new CFlowStatParser();}
private:
int m_dp_core_count;
diff --git a/src/latency.h b/src/latency.h
index 3dd1cc36..f5f90cf9 100644
--- a/src/latency.h
+++ b/src/latency.h
@@ -22,6 +22,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
#include <bp_sim.h>
+#include <flow_stat.h>
#define L_PKT_SUBMODE_NO_REPLY 1
#define L_PKT_SUBMODE_REPLY 2
diff --git a/src/main.cpp b/src/main.cpp
index 6a6b5721..3c68990c 100755
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -253,6 +253,10 @@ TrexStateless * get_stateless_obj() {
return m_sim_statelss_obj;
}
+CRxCoreStateless * get_rx_sl_core_obj() {
+ return NULL;
+}
+
void set_stateless_obj(TrexStateless *obj) {
m_sim_statelss_obj = obj;
}
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index ee408c63..1f415958 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -148,6 +148,7 @@ public:
virtual int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd) { return -1;}
virtual int get_stat_counters_num() {return 0;}
virtual int get_rx_stat_capabilities() {return 0;}
+ virtual CFlowStatParser *get_flow_stat_parser();
};
@@ -281,6 +282,7 @@ public:
virtual int wait_for_stable_link();
virtual int get_stat_counters_num() {return MAX_FLOW_STATS;}
virtual int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;}
+ virtual CFlowStatParser *get_flow_stat_parser();
};
class CTRexExtendedDriverBase40G : public CTRexExtendedDriverBase10G {
@@ -332,9 +334,12 @@ public:
// disabling flow control on 40G using DPDK API causes the interface to malfunction
virtual bool flow_control_disable_supported(){return false;}
virtual bool hw_rx_stat_supported(){return true;}
+ virtual CFlowStatParser *get_flow_stat_parser();
+
private:
virtual void add_del_rules(enum rte_filter_op op, uint8_t port_id, uint16_t type, uint8_t ttl, uint16_t ip_id, int queue, uint16_t stat_idx);
virtual int configure_rx_filter_rules_statfull(CPhyEthIF * _if);
+
private:
uint8_t m_if_per_card;
};
@@ -1231,6 +1236,10 @@ void CPhyEthIFStats::Dump(FILE *fd){
DP_A(rx_nombuf);
}
+// only on VM we have rx queues on DP cores
+void CPhyEthIF::flush_dp_rx_queue(void) {
+}
+
// Clear the RX queue of an interface, dropping all packets
void CPhyEthIF::flush_rx_queue(void){
@@ -1735,8 +1744,8 @@ public:
virtual int send_node(CGenNode * node);
virtual void send_one_pkt(pkt_dir_t dir, rte_mbuf_t *m);
+ virtual void flush_dp_rx_queue(void);
virtual int flush_tx_queue(void);
-
__attribute__ ((noinline)) void flush_rx_queue();
__attribute__ ((noinline)) void update_mac_addr(CGenNode * node,uint8_t *p);
@@ -1804,6 +1813,11 @@ bool CCoreEthIF::Create(uint8_t core_id,
return (true);
}
+// On VM, we get the packets in dp core, so just call general flush_rx_queue
+void CCoreEthIF::flush_dp_rx_queue(void) {
+ flush_rx_queue();
+}
+
// This function is only relevant if we are in VM. In this case, we only have one rx queue. Can't have
// rules to drop queue 0, and pass queue 1 to RX core, like in other cases.
// We receive all packets in the same core that transmitted, and handle them to RX core.
@@ -2699,7 +2713,7 @@ public:
CFlowGenList m_fl;
bool m_fl_was_init;
volatile uint8_t m_signal[BP_MAX_CORES] __rte_cache_aligned ; // Signal to main core when DP thread finished
- volatile bool m_rx_running; // Signal main core when RX thread finished
+ volatile bool m_sl_rx_running; // Signal main core when RX thread finished
CLatencyManager m_mg; // statefull RX core
CRxCoreStateless m_rx_sl; // stateless RX core
CTrexGlobalIoMode m_io_modes;
@@ -2793,7 +2807,9 @@ void CGlobalTRex::try_stop_all_cores(){
TrexStatelessDpQuit * dp_msg= new TrexStatelessDpQuit();
TrexStatelessRxQuit * rx_msg= new TrexStatelessRxQuit();
send_message_all_dp(dp_msg);
- send_message_to_rx(rx_msg);
+ if (get_is_stateless()) {
+ send_message_to_rx(rx_msg);
+ }
delete dp_msg;
// no need to delete rx_msg. Deleted by receiver
bool all_core_finished = false;
@@ -3804,16 +3820,16 @@ int CGlobalTRex::run_in_master() {
int CGlobalTRex::run_in_rx_core(void){
if (get_is_stateless()) {
- m_rx_running = true;
+ m_sl_rx_running = true;
m_rx_sl.start();
+ m_sl_rx_running = false;
} else {
if ( CGlobalInfo::m_options.is_rx_enabled() ){
- m_rx_running = true;
+ m_sl_rx_running = false;
m_mg.start(0);
}
}
- m_rx_running = false;
return (0);
}
@@ -3905,7 +3921,7 @@ bool CGlobalTRex::is_all_cores_finished() {
return false;
}
}
- if (m_rx_running)
+ if (m_sl_rx_running)
return false;
return true;
@@ -4116,11 +4132,14 @@ bool CCoreEthIF::process_rx_pkt(pkt_dir_t dir,
return (send);
}
-
TrexStateless * get_stateless_obj() {
return g_trex.m_trex_stateless;
}
+CRxCoreStateless * get_rx_sl_core_obj() {
+ return &g_trex.m_rx_sl;
+}
+
static int latency_one_lcore(__attribute__((unused)) void *dummy)
{
CPlatformSocketInfo * lpsock=&CGlobalInfo::m_socket;
@@ -4274,12 +4293,19 @@ int core_mask_sanity(uint32_t wanted_core_mask) {
wanted_core_num = num_set_bits(wanted_core_mask);
calc_core_num = num_set_bits(calc_core_mask);
+ if (calc_core_num == 1) {
+ printf ("Error: You have only 1 core available. Minimum configuration requires 2 cores\n");
+ printf(" If you are running on VM, consider adding more cores if possible\n");
+ return -1;
+ }
if (wanted_core_num > calc_core_num) {
printf("Error: You have %d threads available, but you asked for %d threads.\n", calc_core_num, wanted_core_num);
printf(" Calculation is: -c <num>(%d) * dual ports (%d) + 1 master thread %s"
, CGlobalInfo::m_options.preview.getCores(), CGlobalInfo::m_options.get_expected_dual_ports()
, get_is_rx_thread_enabled() ? "+1 latency thread (because of -l flag)\n" : "\n");
- printf(" Maybe try smaller -c <num>.\n");
+ if (CGlobalInfo::m_options.preview.getCores() > 1)
+ printf(" Maybe try smaller -c <num>.\n");
+ printf(" If you are running on VM, consider adding more cores if possible\n");
return -1;
}
@@ -4483,7 +4509,7 @@ int main_test(int argc , char * argv[]){
g_trex.reset_counters();
}
- g_trex.m_rx_running = false;
+ g_trex.m_sl_rx_running = false;
if ( get_is_stateless() ) {
g_trex.start_master_stateless();
@@ -4537,6 +4563,12 @@ int CTRexExtendedDriverBase::configure_drop_queue(CPhyEthIF * _if) {
return (rte_eth_dev_rx_queue_stop(port_id, 0));
}
+CFlowStatParser *CTRexExtendedDriverBase::get_flow_stat_parser() {
+ CFlowStatParser *parser = new CFlowStatParser();
+ assert (parser);
+ return parser;
+}
+
void wait_x_sec(int sec) {
int i;
printf(" wait %d sec ", sec);
@@ -4940,6 +4972,12 @@ int CTRexExtendedDriverBase10G::wait_for_stable_link(){
return (0);
}
+CFlowStatParser *CTRexExtendedDriverBase10G::get_flow_stat_parser() {
+ CFlowStatParser *parser = new C82599Parser();
+ assert (parser);
+ return parser;
+}
+
////////////////////////////////////////////////////////////////////////////////
void CTRexExtendedDriverBase40G::clear_extended_stats(CPhyEthIF * _if){
rte_eth_stats_reset(_if->get_port_id());
@@ -5167,6 +5205,12 @@ int CTRexExtendedDriverBase40G::wait_for_stable_link(){
return (0);
}
+CFlowStatParser *CTRexExtendedDriverBase40G::get_flow_stat_parser() {
+ CFlowStatParser *parser = new CFlowStatParser();
+ assert (parser);
+ return parser;
+}
+
/////////////////////////////////////////////////////////////////////
@@ -5407,3 +5451,8 @@ void TrexDpdkPlatformApi::flush_dp_messages() const {
int TrexDpdkPlatformApi::get_active_pgids(flow_stat_active_t &result) const {
return g_trex.m_trex_stateless->m_rx_flow_stat.get_active_pgids(result);
}
+
+CFlowStatParser *TrexDpdkPlatformApi::get_flow_stat_parser() const {
+ return CTRexExtendedDriverDb::Ins()->get_drv()
+ ->get_flow_stat_parser();
+}
diff --git a/src/main_dpdk.h b/src/main_dpdk.h
index ff1ea784..a9bfed39 100644
--- a/src/main_dpdk.h
+++ b/src/main_dpdk.h
@@ -122,6 +122,7 @@ class CPhyEthIF {
CPhyEthIFStats & get_stats(){
return ( m_stats );
}
+ void flush_dp_rx_queue(void);
void flush_rx_queue(void);
int add_rx_flow_stat_rule(uint8_t type, uint16_t proto, uint16_t id);
int del_rx_flow_stat_rule(uint8_t type, uint16_t proto, uint16_t id);
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index f054c0ed..f7a23188 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -38,6 +38,50 @@ limitations under the License.
using namespace std;
/**
+ * API sync
+ */
+trex_rpc_cmd_rc_e
+TrexRpcCmdAPISync::_run(const Json::Value &params, Json::Value &result) {
+ const Json::Value &api_vers = parse_array(params, "api_vers", result);
+
+ Json::Value api_ver_rc = Json::arrayValue;
+
+ /* for every element in the list - generate the appropirate API handler */
+ for (const auto api_ver : api_vers) {
+ Json::Value single_rc;
+
+ /* only those are supported */
+ const std::string type = parse_choice(api_ver, "type", {"core"}, result);
+
+ int major = parse_int(api_ver, "major", result);
+ int minor = parse_int(api_ver, "minor", result);
+ APIClass::type_e api_type;
+
+ /* decode type of API */
+ if (type == "core") {
+ api_type = APIClass::API_CLASS_TYPE_CORE;
+ }
+
+ single_rc["type"] = type;
+
+ /* this section might throw exception in case versions do not match */
+ try {
+ single_rc["api_h"] = get_stateless_obj()->verify_api(api_type, major, minor);
+
+ } catch (const TrexAPIException &e) {
+ generate_execute_err(result, e.what());
+ }
+
+ /* add to the response */
+ api_ver_rc.append(single_rc);
+ }
+
+ result["result"]["api_vers"] = api_ver_rc;
+
+ return (TREX_RPC_CMD_OK);
+}
+
+/**
* ping command
*/
trex_rpc_cmd_rc_e
diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
index 68bebeb6..40719325 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
@@ -545,7 +545,11 @@ TrexRpcCmdStartTraffic::_run(const Json::Value &params, Json::Value &result) {
std::string type = parse_choice(mul_obj, "type", TrexPortMultiplier::g_types, result);
std::string op = parse_string(mul_obj, "op", result);
double value = parse_double(mul_obj, "value", result);
-
+
+ if ( value <=0 ){
+ generate_parse_err(result, "multiplier can't be zero");
+ }
+
if (op != "abs") {
generate_parse_err(result, "start message can only specify absolute speed rate");
}
@@ -586,6 +590,27 @@ TrexRpcCmdStopTraffic::_run(const Json::Value &params, Json::Value &result) {
}
/***************************
+ * remove all hardware filters
+ *
+ **************************/
+trex_rpc_cmd_rc_e
+TrexRpcCmdRemoveRXFilters::_run(const Json::Value &params, Json::Value &result) {
+
+ uint8_t port_id = parse_port(params, result);
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id);
+
+ try {
+ port->remove_rx_filters();
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
+
+ result["result"] = Json::objectValue;
+
+ return (TREX_RPC_CMD_OK);
+}
+
+/***************************
* get all streams
*
**************************/
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index c4b01b85..428bdd7b 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -36,33 +36,39 @@ class TrexStream;
* syntactic sugar for creating a simple command
*/
-#define TREX_RPC_CMD_DEFINE_EXTENDED(class_name, cmd_name, param_count, needs_ownership, ext) \
- class class_name : public TrexRpcCommand { \
- public: \
- class_name () : TrexRpcCommand(cmd_name, param_count, needs_ownership) {} \
- protected: \
- virtual trex_rpc_cmd_rc_e _run(const Json::Value &params, Json::Value &result); \
- ext \
+#define TREX_RPC_CMD_DEFINE_EXTENDED(class_name, cmd_name, param_count, needs_ownership, api_type, ext) \
+ class class_name : public TrexRpcCommand { \
+ public: \
+ class_name () : TrexRpcCommand(cmd_name, param_count, needs_ownership, api_type) {} \
+ protected: \
+ virtual trex_rpc_cmd_rc_e _run(const Json::Value &params, Json::Value &result); \
+ ext \
}
-#define TREX_RPC_CMD_DEFINE(class_name, cmd_name, param_count, needs_ownership) TREX_RPC_CMD_DEFINE_EXTENDED(class_name, cmd_name, param_count, needs_ownership, ;)
+#define TREX_RPC_CMD_DEFINE(class_name, cmd_name, param_count, needs_ownership, api_type) TREX_RPC_CMD_DEFINE_EXTENDED(class_name, cmd_name, param_count, needs_ownership, api_type, ;)
/**
* test cmds
*/
-TREX_RPC_CMD_DEFINE(TrexRpcCmdTestAdd, "test_add", 2, false);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdTestSub, "test_sub", 2, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdTestAdd, "test_add", 2, false, APIClass::API_CLASS_TYPE_NO_API);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdTestSub, "test_sub", 2, false, APIClass::API_CLASS_TYPE_NO_API);
+
+/**
+ * api_sync command always present and valid and also ping....
+ */
+TREX_RPC_CMD_DEFINE(TrexRpcCmdAPISync, "api_sync", 1, false, APIClass::API_CLASS_TYPE_NO_API);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdPing, "ping", 0, false, APIClass::API_CLASS_TYPE_NO_API);
/**
* general cmds
*/
-TREX_RPC_CMD_DEFINE(TrexRpcCmdPing, "ping", 0, false);
-TREX_RPC_CMD_DEFINE(TrexRpcPublishNow, "publish_now", 2, false);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetCmds, "get_supported_cmds", 0, false);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetVersion, "get_version", 0, false);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetActivePGIds, "get_active_pgids",0, false);
-TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdGetSysInfo, "get_system_info", 0, false,
+TREX_RPC_CMD_DEFINE(TrexRpcPublishNow, "publish_now", 2, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetCmds, "get_supported_cmds", 0, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetVersion, "get_version", 0, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetActivePGIds, "get_active_pgids", 0, false, APIClass::API_CLASS_TYPE_CORE);
+
+TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdGetSysInfo, "get_system_info", 0, false, APIClass::API_CLASS_TYPE_CORE,
std::string get_cpu_model();
void get_hostname(std::string &hostname);
@@ -72,25 +78,25 @@ void get_hostname(std::string &hostname);
/**
* ownership
*/
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetOwner, "get_owner", 1, false);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdAcquire, "acquire", 4, false);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdRelease, "release", 1, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetOwner, "get_owner", 1, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdAcquire, "acquire", 4, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdRelease, "release", 1, true, APIClass::API_CLASS_TYPE_CORE);
/**
* port commands
*/
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStats, "get_port_stats", 1, false);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStatus, "get_port_status", 1, false);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdSetPortAttr, "set_port_attr", 3, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStats, "get_port_stats", 1, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortStatus, "get_port_status", 1, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdSetPortAttr, "set_port_attr", 3, false, APIClass::API_CLASS_TYPE_CORE);
/**
* stream cmds
*/
-TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveAllStreams, "remove_all_streams", 1, true);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveStream, "remove_stream", 2, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveAllStreams, "remove_all_streams", 1, true, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveStream, "remove_stream", 2, true, APIClass::API_CLASS_TYPE_CORE);
-TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdAddStream, "add_stream", 3, true,
+TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdAddStream, "add_stream", 3, true, APIClass::API_CLASS_TYPE_CORE,
/* extended part */
std::unique_ptr<TrexStream> allocate_new_stream(const Json::Value &section, uint8_t port_id, uint32_t stream_id, Json::Value &result);
@@ -107,20 +113,22 @@ void parse_vm_instr_write_mask_flow_var(const Json::Value &inst, std::unique_ptr
);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStreamList, "get_stream_list", 1, false);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetAllStreams, "get_all_streams", 1, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStreamList, "get_stream_list", 1, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetAllStreams, "get_all_streams", 1, false, APIClass::API_CLASS_TYPE_CORE);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 3, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 3, false, APIClass::API_CLASS_TYPE_CORE);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 4, true);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic, "stop_traffic", 1, true);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdPauseTraffic, "pause_traffic", 1, true);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdResumeTraffic, "resume_traffic", 1, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 4, true, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic, "stop_traffic", 1, true, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveRXFilters, "remove_rx_filters", 1, true, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdPauseTraffic, "pause_traffic", 1, true, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdResumeTraffic, "resume_traffic", 1, true, APIClass::API_CLASS_TYPE_CORE);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdUpdateTraffic, "update_traffic", 3, true);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdUpdateTraffic, "update_traffic", 3, true, APIClass::API_CLASS_TYPE_CORE);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdValidate, "validate", 2, false);
+TREX_RPC_CMD_DEFINE(TrexRpcCmdValidate, "validate", 2, false, APIClass::API_CLASS_TYPE_CORE);
#endif /* __TREX_RPC_CMD_H__ */
+
diff --git a/src/rpc-server/trex_rpc_cmd.cpp b/src/rpc-server/trex_rpc_cmd.cpp
index caf161e3..902e63c7 100644
--- a/src/rpc-server/trex_rpc_cmd.cpp
+++ b/src/rpc-server/trex_rpc_cmd.cpp
@@ -23,6 +23,32 @@ limitations under the License.
#include <trex_stateless.h>
#include <trex_stateless_port.h>
+/**
+ * method name and params
+ *
+ */
+TrexRpcCommand::TrexRpcCommand(const std::string &method_name,
+ int param_count,
+ bool needs_ownership,
+ APIClass::type_e type) : m_name(method_name),
+ m_param_count(param_count),
+ m_needs_ownership(needs_ownership) {
+
+ /* if needs ownership - another field is needed (handler) */
+ if (m_needs_ownership) {
+ m_param_count++;
+ }
+
+ /* API verification */
+ m_api_type = type;
+
+ if (type != APIClass::API_CLASS_TYPE_NO_API) {
+ m_api_handler = get_stateless_obj()->get_api_handler(type);
+ m_param_count++;
+ }
+
+}
+
trex_rpc_cmd_rc_e
TrexRpcCommand::run(const Json::Value &params, Json::Value &result) {
trex_rpc_cmd_rc_e rc;
@@ -30,12 +56,18 @@ TrexRpcCommand::run(const Json::Value &params, Json::Value &result) {
/* the internal run can throw a parser error / other error */
try {
- check_param_count(params, m_param_count, result);
+ /* verify API handler is correct (version mismatch) */
+ if ( (m_api_type != APIClass::API_CLASS_TYPE_NO_API) && !g_test_override_api ) {
+ verify_api_handler(params, result);
+ }
+ /* verify ownership */
if (m_needs_ownership && !g_test_override_ownership) {
verify_ownership(params, result);
}
+ check_param_count(params, m_param_count, result);
+
/* run the command itself*/
rc = _run(params, result);
@@ -72,6 +104,17 @@ TrexRpcCommand::verify_ownership(const Json::Value &params, Json::Value &result)
}
}
+void
+TrexRpcCommand::verify_api_handler(const Json::Value &params, Json::Value &result) {
+ std::string api_handler = parse_string(params, "api_h", result);
+
+ if (m_api_handler != api_handler) {
+ std::stringstream ss;
+ ss << "API verification failed - API handler provided mismatch for class: '" << APIClass::type_to_name(m_api_type) << "'";
+ generate_execute_err(result, ss.str());
+ }
+}
+
uint8_t
TrexRpcCommand::parse_port(const Json::Value &params, Json::Value &result) {
uint8_t port_id = parse_byte(params, "port_id", result);
@@ -281,3 +324,4 @@ TrexRpcCommand::generate_execute_err(Json::Value &result, const std::string &msg
* by default this is off
*/
bool TrexRpcCommand::g_test_override_ownership = false;
+bool TrexRpcCommand::g_test_override_api = false;
diff --git a/src/rpc-server/trex_rpc_cmd_api.h b/src/rpc-server/trex_rpc_cmd_api.h
index 7e694768..25920c6c 100644
--- a/src/rpc-server/trex_rpc_cmd_api.h
+++ b/src/rpc-server/trex_rpc_cmd_api.h
@@ -27,6 +27,8 @@ limitations under the License.
#include <json/json.h>
#include <trex_rpc_exception_api.h>
+#include "trex_api_class.h"
+
/**
* describe different types of rc for run()
*/
@@ -68,16 +70,10 @@ public:
/**
* method name and params
*/
- TrexRpcCommand(const std::string &method_name, int param_count, bool needs_ownership) :
- m_name(method_name),
- m_param_count(param_count),
- m_needs_ownership(needs_ownership) {
-
- /* if needs ownership - another field is needed (handler) */
- if (m_needs_ownership) {
- m_param_count++;
- }
- }
+ TrexRpcCommand(const std::string &method_name,
+ int param_count,
+ bool needs_ownership,
+ APIClass::type_e type);
/**
* entry point for executing RPC command
@@ -99,6 +95,10 @@ public:
g_test_override_ownership = enable;
}
+ static void test_set_override_api(bool enable) {
+ g_test_override_api = enable;
+ }
+
virtual ~TrexRpcCommand() {}
protected:
@@ -131,11 +131,18 @@ protected:
void check_param_count(const Json::Value &params, int expected, Json::Value &result);
/**
+ * verify API handler
+ *
+ */
+ void verify_api_handler(const Json::Value &params, Json::Value &result);
+
+ /**
* verify ownership
*
*/
void verify_ownership(const Json::Value &params, Json::Value &result);
+
/**
* validate port id
*
@@ -360,11 +367,13 @@ protected:
const char * json_type_to_name(const Json::Value &value);
/* RPC command name */
- std::string m_name;
- int m_param_count;
- bool m_needs_ownership;
-
- static bool g_test_override_ownership;
+ std::string m_name;
+ int m_param_count;
+ bool m_needs_ownership;
+ std::string m_api_handler;
+ APIClass::type_e m_api_type;
+ static bool g_test_override_ownership;
+ static bool g_test_override_api;
};
#endif /* __TREX_RPC_CMD_API_H__ */
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index e1bd3eee..924503f2 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -33,6 +33,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
/* general */
+ register_command(new TrexRpcCmdAPISync());
register_command(new TrexRpcCmdPing());
register_command(new TrexRpcPublishNow());
register_command(new TrexRpcCmdGetCmds());
@@ -61,6 +62,8 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
register_command(new TrexRpcCmdResumeTraffic());
register_command(new TrexRpcCmdUpdateTraffic());
+ register_command(new TrexRpcCmdRemoveRXFilters());
+
register_command(new TrexRpcCmdValidate());
}
diff --git a/src/rpc-server/trex_rpc_exception_api.h b/src/rpc-server/trex_rpc_exception_api.h
index e349b980..ebc9b411 100644
--- a/src/rpc-server/trex_rpc_exception_api.h
+++ b/src/rpc-server/trex_rpc_exception_api.h
@@ -25,17 +25,19 @@ limitations under the License.
#include <string>
#include <stdexcept>
+#include "trex_exception.h"
+
/**
* generic exception for RPC errors
*
*/
-class TrexRpcException : public std::runtime_error
-{
+class TrexRpcException : public TrexException {
+
public:
- TrexRpcException() : std::runtime_error("") {
+ TrexRpcException() : TrexException("") {
}
- TrexRpcException(const std::string &what) : std::runtime_error(what) {
+ TrexRpcException(const std::string &what) : TrexException(what) {
}
};
diff --git a/src/sim/trex_sim_stateless.cpp b/src/sim/trex_sim_stateless.cpp
index ffe377f4..fa13401d 100644
--- a/src/sim/trex_sim_stateless.cpp
+++ b/src/sim/trex_sim_stateless.cpp
@@ -117,6 +117,7 @@ SimStateless::SimStateless() {
/* override ownership checks */
TrexRpcCommand::test_set_override_ownership(true);
+ TrexRpcCommand::test_set_override_api(true);
}
diff --git a/src/stateless/cp/trex_api_class.h b/src/stateless/cp/trex_api_class.h
new file mode 100644
index 00000000..78933d23
--- /dev/null
+++ b/src/stateless/cp/trex_api_class.h
@@ -0,0 +1,110 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_API_CLASS_H__
+#define __TREX_API_CLASS_H__
+
+#include <assert.h>
+
+#include "common/basic_utils.h"
+#include "trex_exception.h"
+
+/**
+ * API exception
+ *
+ * @author imarom (03-Apr-16)
+ */
+class TrexAPIException : public TrexException {
+public:
+ TrexAPIException(const std::string &what) : TrexException(what) {
+ }
+};
+
+/**
+ * define an API class
+ *
+ * @author imarom (03-Apr-16)
+ */
+class APIClass {
+public:
+
+ enum type_e {
+ API_CLASS_TYPE_CORE = 0,
+ API_CLASS_TYPE_MAX,
+
+ API_CLASS_TYPE_NO_API
+ };
+
+ static const char * type_to_name(type_e type) {
+ switch (type) {
+ case API_CLASS_TYPE_CORE:
+ return "core";
+ default:
+ assert(0);
+ }
+ }
+
+ APIClass() {
+ /* invalid */
+ m_type = API_CLASS_TYPE_MAX;
+ }
+
+ void init(type_e type, int major, int minor) {
+ m_type = type;
+ m_major = major;
+ m_minor = minor;
+
+ unsigned int seed = time(NULL);
+ m_handler = utl_generate_random_str(seed, 8);
+ }
+
+ std::string & verify_api(int major, int minor) {
+ std::stringstream ss;
+ ss << "API type '" << type_to_name(m_type) << "': ";
+
+ assert(m_type < API_CLASS_TYPE_MAX);
+
+ /* for now a simple major check */
+ if (major < m_major) {
+ ss << "server has a major newer API version - server: '" << m_major << "', client: '" << major << "'";
+ throw TrexAPIException(ss.str());
+ }
+
+ if (major > m_major) {
+ ss << "server has an older API version - server: '" << m_major << "', client: '" << major << "'";
+ throw TrexAPIException(ss.str());
+ }
+
+ return get_api_handler();
+ }
+
+ std::string & get_api_handler() {
+ return m_handler;
+ }
+
+private:
+ type_e m_type;
+ int m_major;
+ int m_minor;
+ std::string m_handler;
+
+};
+
+#endif /* __TREX_API_CLASS_H__ */
diff --git a/src/stateless/cp/trex_exception.h b/src/stateless/cp/trex_exception.h
new file mode 100644
index 00000000..b9e20761
--- /dev/null
+++ b/src/stateless/cp/trex_exception.h
@@ -0,0 +1,41 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_EXCEPTION_H__
+#define __TREX_EXCEPTION_H__
+
+#include <stdexcept>
+#include <string>
+
+/**
+ * generic exception for errors
+ * TODO: move this to a better place
+ */
+class TrexException : public std::runtime_error
+{
+public:
+ TrexException() : std::runtime_error("") {
+
+ }
+ TrexException(const std::string &what) : std::runtime_error(what) {
+ }
+};
+
+#endif /* __TREX_EXCEPTION_H__ */
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index 9df57a50..f6f81b96 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -53,6 +53,8 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) {
m_platform_api = cfg.m_platform_api;
m_publisher = cfg.m_publisher;
+ /* API core version */
+ m_api_classes[APIClass::API_CLASS_TYPE_CORE].init(APIClass::API_CLASS_TYPE_CORE, 1, 0);
}
/**
@@ -175,3 +177,4 @@ TrexStateless::generate_publish_snapshot(std::string &snapshot) {
snapshot = writer.write(root);
}
+
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index 6e5e0c44..b506da61 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -27,27 +27,18 @@ limitations under the License.
#include <mutex>
-#include <trex_stream.h>
-#include <trex_stateless_port.h>
-#include <trex_rpc_server_api.h>
-#include <publisher/trex_publisher.h>
+#include "trex_stream.h"
+#include "trex_stateless_port.h"
+#include "trex_rpc_server_api.h"
-#include <flow_stat.h>
-#include <internal_api/trex_platform_api.h>
+#include "publisher/trex_publisher.h"
+#include "internal_api/trex_platform_api.h"
-/**
- * generic exception for errors
- * TODO: move this to a better place
- */
-class TrexException : public std::runtime_error
-{
-public:
- TrexException() : std::runtime_error("") {
+#include "flow_stat.h"
- }
- TrexException(const std::string &what) : std::runtime_error(what) {
- }
-};
+
+#include "trex_exception.h"
+#include "trex_api_class.h"
class TrexStatelessPort;
@@ -81,6 +72,7 @@ public:
} m_stats;
};
+
/**
* config object for stateless object
*
@@ -167,6 +159,14 @@ public:
return m_rpc_server;
}
+ const std::string & verify_api(APIClass::type_e type, int major, int minor) {
+ return m_api_classes[type].verify_api(major, minor);
+ }
+
+ const std::string & get_api_handler(APIClass::type_e type) {
+ return m_api_classes[type].get_api_handler();
+ }
+
CFlowStatRuleMgr m_rx_flow_stat;
protected:
@@ -187,6 +187,8 @@ protected:
TrexPublisher *m_publisher;
+ /* API */
+ APIClass m_api_classes[APIClass::API_CLASS_TYPE_MAX];
};
/**
@@ -197,6 +199,7 @@ protected:
* @return TrexStateless&
*/
TrexStateless * get_stateless_obj();
+CRxCoreStateless * get_rx_sl_core_obj();
#endif /* __TREX_STATELESS_H__ */
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 90589d7a..2239f3f6 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -272,6 +272,22 @@ TrexStatelessPort::stop_traffic(void) {
}
/**
+ * remove all RX filters from port
+ *
+ * @author imarom (28-Mar-16)
+ */
+void
+TrexStatelessPort::remove_rx_filters(void) {
+ /* only valid when IDLE or with streams and not TXing */
+ verify_state(PORT_STATE_STREAMS);
+
+ for (auto entry : m_stream_table) {
+ get_stateless_obj()->m_rx_flow_stat.stop_stream(entry.second);
+ }
+
+}
+
+/**
* when a port stops, perform various actions
*
*/
@@ -287,9 +303,6 @@ TrexStatelessPort::common_port_stop_actions(bool async) {
get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
}
- for (auto entry : m_stream_table) {
- get_stateless_obj()->m_rx_flow_stat.stop_stream(entry.second);
- }
}
void
@@ -768,26 +781,5 @@ TrexPortOwner::TrexPortOwner() {
m_seed = time(NULL);
}
-/**
- * generate a random connection handler
- *
- */
-std::string
-TrexPortOwner::generate_handler() {
- std::stringstream ss;
-
- static const char alphanum[] =
- "0123456789"
- "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
- "abcdefghijklmnopqrstuvwxyz";
-
- /* generate 8 bytes of random handler */
- for (int i = 0; i < 8; ++i) {
- ss << alphanum[rand_r(&m_seed) % (sizeof(alphanum) - 1)];
- }
-
- return (ss.str());
-}
-
const std::string TrexPortOwner::g_unowned_name = "<FREE>";
const std::string TrexPortOwner::g_unowned_handler = "";
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 7e1838d4..2167e735 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -21,6 +21,7 @@ limitations under the License.
#ifndef __TREX_STATELESS_PORT_H__
#define __TREX_STATELESS_PORT_H__
+#include "common/basic_utils.h"
#include "internal_api/trex_platform_api.h"
#include "trex_dp_port_events.h"
#include "trex_stream.h"
@@ -65,7 +66,7 @@ public:
m_owner_name = owner_name;
/* internal data */
- m_handler = generate_handler();
+ m_handler = utl_generate_random_str(m_seed, 8);
m_is_free = false;
}
@@ -83,7 +84,6 @@ public:
private:
- std::string generate_handler();
/* is this port owned by someone ? */
bool m_is_free;
@@ -178,6 +178,14 @@ public:
void stop_traffic(void);
/**
+ * remove all RX filters
+ * valid only when port is stopped
+ *
+ * @author imarom (28-Mar-16)
+ */
+ void remove_rx_filters(void);
+
+ /**
* pause traffic
* throws TrexException in case of an error
*/
diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp
index 9c7898a8..e3f0ba7c 100644
--- a/src/stateless/cp/trex_stream.cpp
+++ b/src/stateless/cp/trex_stream.cpp
@@ -106,6 +106,15 @@ void TrexStream::Dump(FILE *fd){
}
}
+ if (m_rx_check.m_enabled) {
+ fprintf(fd, " Flow stat enabled:\n");
+ fprintf(fd, " seq check %s latency check %s packet group id %d hw_id %d\n"
+ , m_rx_check.m_seq_enabled ? "enabled":"disabled"
+ , m_rx_check.m_latency ? "enabled":"disabled", m_rx_check.m_pg_id, m_rx_check.m_hw_id
+ );
+ } else {
+ fprintf(fd, " Flow stat disabled\n");
+ }
fprintf(fd," rate :\n\n");
fprintf(fd," pps : %f\n", m_rate.get_pps());
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
index 563236c2..d6971d68 100644
--- a/src/stateless/cp/trex_streams_compiler.cpp
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -477,8 +477,10 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream,
TrexStream *fixed_rx_flow_stat_stream = stream->clone(true);
- // not checking for errors. We assume that if add_stream succeeded, start_stream will too.
- get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id);
+ get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream);
+ // CFlowStatRuleMgr keeps state of the stream object. We duplicated the stream here (in order not
+ // change the packet kept in the stream). We want the state to be saved in the original stream.
+ get_stateless_obj()->m_rx_flow_stat.copy_state(fixed_rx_flow_stat_stream, stream);
/* can this stream be split to many cores ? */
if (!stream->is_splitable(dp_core_count)) {
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index ba25f61d..f125a46a 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -399,6 +399,7 @@ TrexStatelessDpCore::idle_state_loop() {
int counter = 0;
while (m_state == STATE_IDLE) {
+ m_core->m_node_gen.m_v_if->flush_dp_rx_queue();
bool had_msg = periodic_check_for_cp_messages();
if (had_msg) {
counter = 0;
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index ab7c08d1..26f537f8 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -2,6 +2,7 @@
#include "bp_sim.h"
#include "flow_stat_parser.h"
#include "latency.h"
+#include "pal/linux/sanb_atomic.h"
#include "trex_stateless_messaging.h"
#include "trex_stateless_rx_core.h"
@@ -59,6 +60,8 @@ void CRxCoreStateless::idle_state_loop() {
if (had_msg) {
counter = 0;
continue;
+ } else {
+ flush_rx();
}
/* enter deep sleep only if enough time had passed */
@@ -72,8 +75,8 @@ void CRxCoreStateless::idle_state_loop() {
}
void CRxCoreStateless::start() {
- static int count = 0;
- static int i = 0;
+ int count = 0;
+ int i = 0;
bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false;
while (true) {
@@ -91,7 +94,11 @@ void CRxCoreStateless::start() {
} else {
if (m_state == STATE_QUIT)
break;
+ count = 0;
+ i = 0;
+ set_working_msg_ack(false);
idle_state_loop();
+ set_working_msg_ack(true);
}
if (do_try_rx_queue) {
try_rx_queues();
@@ -101,7 +108,7 @@ void CRxCoreStateless::start() {
}
void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPort *lp, rte_mbuf_t *m) {
- Cxl710Parser parser;
+ CFlowStatParser parser;
if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
uint16_t ip_id;
@@ -162,6 +169,30 @@ void CRxCoreStateless::try_rx_queues() {
}
}
+// exactly the same as try_rx, without the handle_rx_pkt
+// purpose is to flush rx queues when core is in idle state
+void CRxCoreStateless::flush_rx() {
+ rte_mbuf_t * rx_pkts[64];
+ int i, total_pkts = 0;
+ for (i = 0; i < m_max_ports; i++) {
+ CLatencyManagerPerPort * lp = &m_ports[i];
+ rte_mbuf_t * m;
+ m_cpu_dp_u.start_work();
+ /* try to read 64 packets clean up the queue */
+ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
+ total_pkts += cnt_p;
+ if (cnt_p) {
+ int j;
+ for (j = 0; j < cnt_p; j++) {
+ m = rx_pkts[j];
+ rte_pktmbuf_free(m);
+ }
+ /* commit only if there was work to do ! */
+ m_cpu_dp_u.commit();
+ }/* if work */
+ }// all ports
+}
+
int CRxCoreStateless::try_rx() {
rte_mbuf_t * rx_pkts[64];
int i, total_pkts = 0;
@@ -211,6 +242,12 @@ int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int
return 0;
}
+void CRxCoreStateless::set_working_msg_ack(bool val) {
+ sanb_smp_memory_barrier();
+ m_ack_start_work_msg = val;
+ sanb_smp_memory_barrier();
+}
+
double CRxCoreStateless::get_cpu_util() {
m_cpu_cp_u.Update();
return m_cpu_cp_u.GetVal();
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index 5ab12f4e..b78256c2 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -54,6 +54,8 @@ class CRxCoreStateless {
void work() {m_state = STATE_WORKING;}
void idle() {m_state = STATE_IDLE;}
void quit() {m_state = STATE_QUIT;}
+ bool is_working() const {return (m_ack_start_work_msg == true);}
+ void set_working_msg_ack(bool val);
double get_cpu_util();
private:
@@ -62,6 +64,7 @@ class CRxCoreStateless {
void idle_state_loop();
void handle_rx_pkt(CLatencyManagerPerPort * lp, rte_mbuf_t * m);
void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
+ void flush_rx();
int try_rx();
void try_rx_queues();
bool is_flow_stat_id(uint16_t id);
@@ -71,10 +74,13 @@ class CRxCoreStateless {
uint32_t m_max_ports;
bool m_has_streams;
CLatencyManagerPerPort m_ports[TREX_MAX_PORTS];
- state_e m_state; /* state of all ports */
+ state_e m_state;
CNodeRing *m_ring_from_cp;
CNodeRing *m_ring_to_cp;
CCpuUtlDp m_cpu_dp_u;
CCpuUtlCp m_cpu_cp_u;
+ // Used for acking "work" (go out of idle) messages from cp
+ volatile bool m_ack_start_work_msg __rte_cache_aligned;
+
};
#endif