summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/common/basic_utils.cpp6
-rwxr-xr-xsrc/common/basic_utils.h2
-rw-r--r--src/flow_stat.cpp20
-rw-r--r--src/main_dpdk.cpp2
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp36
-rw-r--r--src/stateless/cp/trex_stateless_port.h30
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp2
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h17
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp40
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h16
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.cpp25
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.h19
12 files changed, 138 insertions, 77 deletions
diff --git a/src/common/basic_utils.cpp b/src/common/basic_utils.cpp
index dfd3b183..fded49ec 100755
--- a/src/common/basic_utils.cpp
+++ b/src/common/basic_utils.cpp
@@ -273,18 +273,18 @@ void utl_set_coredump_size(long size, bool map_huge_pages) {
fclose(fp);
}
-uint32_t utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num) {
+bool utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num) {
uint32_t tmp;
int rc = my_inet_pton4(ipv4_str, (unsigned char *)&tmp);
if (!rc) {
- return (0);
+ return false;
}
ipv4_num = PAL_NTOHL(tmp);
- return (1);
+ return true;
}
std::string utl_uint32_to_ipv4(uint32_t ipv4_addr) {
diff --git a/src/common/basic_utils.h b/src/common/basic_utils.h
index c30457d7..36f9db85 100755
--- a/src/common/basic_utils.h
+++ b/src/common/basic_utils.h
@@ -101,7 +101,7 @@ std::string utl_generate_random_str(unsigned int &seed, int len);
*/
void utl_set_coredump_size(long size, bool map_huge_pages = false);
-uint32_t utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num);
+bool utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num);
std::string utl_uint32_to_ipv4(uint32_t ipv4_addr);
#endif
diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp
index dae29795..ba125df2 100644
--- a/src/flow_stat.cpp
+++ b/src/flow_stat.cpp
@@ -802,11 +802,15 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream) {
#endif
if (m_num_started_streams == 0) {
+
send_start_stop_msg_to_rx(true); // First transmitting stream. Rx core should start reading packets;
+ assert(m_rx_core->is_working());
+
//also good time to zero global counters
memset(m_rx_cant_count_err, 0, sizeof(m_rx_cant_count_err));
memset(m_tx_cant_count_err, 0, sizeof(m_tx_cant_count_err));
+ #if 0
// wait to make sure that message is acknowledged. RX core might be in deep sleep mode, and we want to
// start transmitting packets only after it is working, otherwise, packets will get lost.
if (m_rx_core) { // in simulation, m_rx_core will be NULL
@@ -819,6 +823,8 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream) {
}
}
}
+ #endif
+
} else {
// make sure rx core is working. If not, we got really confused somehow.
if (m_rx_core)
@@ -966,13 +972,21 @@ int CFlowStatRuleMgr::set_mode(enum flow_stat_mode_e mode) {
extern bool rx_should_stop;
void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) {
TrexStatelessCpToRxMsgBase *msg;
-
+
if (is_start) {
- msg = new TrexStatelessRxEnableLatency();
+ static MsgReply<bool> reply;
+ reply.reset();
+
+ msg = new TrexStatelessRxEnableLatency(reply);
+ m_ring_to_rx->Enqueue((CGenNode *)msg);
+
+ /* hold until message was ack'ed - otherwise we might lose packets */
+ reply.wait_for_reply();
+
} else {
msg = new TrexStatelessRxDisableLatency();
+ m_ring_to_rx->Enqueue((CGenNode *)msg);
}
- m_ring_to_rx->Enqueue((CGenNode *)msg);
}
// return false if no counters changed since last run. true otherwise
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 2783182a..c02d6760 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -5744,7 +5744,7 @@ CFlowStatParser *CTRexExtendedDriverBase::get_flow_stat_parser() {
// in 1G we need to wait if links became ready to soon
void CTRexExtendedDriverBase1G::wait_after_link_up(){
- //wait_x_sec(6 + CGlobalInfo::m_options.m_wait_before_traffic);
+ wait_x_sec(6 + CGlobalInfo::m_options.m_wait_before_traffic);
}
int CTRexExtendedDriverBase1G::wait_for_stable_link(){
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index cb7d5149..107b7b4b 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -776,31 +776,31 @@ TrexRpcCmdSetRxFeature::parse_capture_msg(const Json::Value &msg, TrexStatelessP
void
TrexRpcCmdSetRxFeature::parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) {
- bool enabled = parse_bool(msg, "enabled", result);
+ bool enabled = parse_bool(msg, "enabled", result);
- if (enabled) {
+ if (enabled) {
- uint64_t size = parse_uint32(msg, "size", result);
+ uint64_t size = parse_uint32(msg, "size", result);
- if (size == 0) {
- generate_parse_err(result, "queue size cannot be zero");
- }
+ if (size == 0) {
+ generate_parse_err(result, "queue size cannot be zero");
+ }
- try {
- port->start_rx_queue(size);
- } catch (const TrexException &ex) {
- generate_execute_err(result, ex.what());
- }
+ try {
+ port->start_rx_queue(size);
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
- } else {
+ } else {
- try {
- port->stop_rx_queue();
- } catch (const TrexException &ex) {
- generate_execute_err(result, ex.what());
- }
+ try {
+ port->stop_rx_queue();
+ } catch (const TrexException &ex) {
+ generate_execute_err(result, ex.what());
+ }
- }
+ }
}
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index f2829b8a..74ab17f1 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -377,21 +377,21 @@ public:
*/
void stop_rx_capture();
- /**
- * start RX queueing of packets
- *
- * @author imarom (11/7/2016)
- *
- * @param limit
- */
- void start_rx_queue(uint64_t limit);
-
- /**
- * stop RX queueing
- *
- * @author imarom (11/7/2016)
- */
- void stop_rx_queue();
+ /**
+ * start RX queueing of packets
+ *
+ * @author imarom (11/7/2016)
+ *
+ * @param limit
+ */
+ void start_rx_queue(uint64_t limit);
+
+ /**
+ * stop RX queueing
+ *
+ * @author imarom (11/7/2016)
+ */
+ void stop_rx_queue();
/**
* fetch the RX queue packets from the queue
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 53d5a87e..17acb21e 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -243,6 +243,8 @@ TrexDpPortEventMsg::handle() {
/************************* messages from CP to RX **********************/
bool TrexStatelessRxEnableLatency::handle (CRxCoreStateless *rx_core) {
rx_core->enable_latency();
+ m_reply.set_reply(true);
+
return true;
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 303548aa..79a6bf08 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -63,7 +63,7 @@ public:
m_pending = false;
}
- T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) {
+ T wait_for_reply(int timeout_ms = 500, int backoff_ms = 1) {
int guard = timeout_ms;
while (is_pending()) {
@@ -461,7 +461,14 @@ public:
class TrexStatelessRxEnableLatency : public TrexStatelessCpToRxMsgBase {
+public:
+ TrexStatelessRxEnableLatency(MsgReply<bool> &reply) : m_reply(reply) {
+ }
+
bool handle (CRxCoreStateless *rx_core);
+
+private:
+ MsgReply<bool> &m_reply;
};
class TrexStatelessRxDisableLatency : public TrexStatelessCpToRxMsgBase {
@@ -505,7 +512,7 @@ public:
virtual bool handle(CRxCoreStateless *rx_core);
private:
- uint8_t m_port_id;
+ uint8_t m_port_id;
};
@@ -515,8 +522,8 @@ public:
uint64_t size,
MsgReply<bool> &reply) : m_reply(reply) {
- m_port_id = port_id;
- m_size = size;
+ m_port_id = port_id;
+ m_size = size;
}
virtual bool handle(CRxCoreStateless *rx_core);
@@ -537,7 +544,7 @@ public:
virtual bool handle(CRxCoreStateless *rx_core);
private:
- uint8_t m_port_id;
+ uint8_t m_port_id;
};
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index f518fcd3..dc637163 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -121,6 +121,7 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() {
handle_cp_msg(msg);
}
+ /* a message might result in a change of state */
recalculate_next_state();
return true;
@@ -218,9 +219,7 @@ void CRxCoreStateless::start() {
switch (m_state) {
case STATE_IDLE:
- set_working_msg_ack(false);
idle_state_loop();
- set_working_msg_ack(true);
break;
case STATE_WORKING:
@@ -311,23 +310,11 @@ void CRxCoreStateless::reset_rx_stats(uint8_t port_id) {
int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max
, bool reset, TrexPlatformApi::driver_stat_cap_e type) {
- RXLatency &latency = m_rx_port_mngr[port_id].get_latency();
-
- for (int hw_id = min; hw_id <= max; hw_id++) {
- if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
- rx_stats[hw_id - min] = latency.m_rx_pg_stat_payload[hw_id];
- } else {
- rx_stats[hw_id - min] = latency.m_rx_pg_stat[hw_id];
- }
- if (reset) {
- if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
- latency.m_rx_pg_stat_payload[hw_id].clear();
- } else {
- latency.m_rx_pg_stat[hw_id].clear();
- }
- }
- }
- return 0;
+ /* for now only latency stats */
+ m_rx_port_mngr[port_id].get_latency_stats(rx_stats, min, max, reset, type);
+
+ return (0);
+
}
int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) {
@@ -354,13 +341,6 @@ int CRxCoreStateless::get_rx_err_cntrs(CRxCoreErrCntrs *rx_err) {
return 0;
}
-void CRxCoreStateless::set_working_msg_ack(bool val) {
- sanb_smp_memory_barrier();
- m_ack_start_work_msg = val;
- sanb_smp_memory_barrier();
-}
-
-
void CRxCoreStateless::update_cpu_util(){
m_cpu_cp_u.Update();
}
@@ -373,21 +353,25 @@ double CRxCoreStateless::get_cpu_util() {
void
CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) {
m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit);
+ recalculate_next_state();
}
void
CRxCoreStateless::stop_recorder(uint8_t port_id) {
m_rx_port_mngr[port_id].stop_recorder();
+ recalculate_next_state();
}
void
CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) {
m_rx_port_mngr[port_id].start_queue(size);
+ recalculate_next_state();
}
void
CRxCoreStateless::stop_queue(uint8_t port_id) {
m_rx_port_mngr[port_id].stop_queue();
+ recalculate_next_state();
}
void
@@ -395,6 +379,8 @@ CRxCoreStateless::enable_latency() {
for (int i = 0; i < m_max_ports; i++) {
m_rx_port_mngr[i].enable_latency();
}
+
+ recalculate_next_state();
}
void
@@ -402,6 +388,8 @@ CRxCoreStateless::disable_latency() {
for (int i = 0; i < m_max_ports; i++) {
m_rx_port_mngr[i].disable_latency();
}
+
+ recalculate_next_state();
}
const RXPortManager &
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index b27a7ca5..7481ae2f 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -29,7 +29,10 @@
class TrexStatelessCpToRxMsgBase;
-
+/**
+ * RFC 2544 implementation
+ *
+ */
class CRFC2544Info {
public:
void create();
@@ -88,7 +91,15 @@ class CRxCoreErrCntrs {
uint64_t m_old_flow;
};
+/**
+ * stateless RX core
+ *
+ */
class CRxCoreStateless {
+
+ /**
+ * core states
+ */
enum state_e {
STATE_IDLE,
STATE_WORKING,
@@ -106,8 +117,7 @@ class CRxCoreStateless {
void quit() {m_state = STATE_QUIT;}
- bool is_working() const {return (m_ack_start_work_msg == true);}
- void set_working_msg_ack(bool val);
+ bool is_working() const {return (m_state == STATE_WORKING);}
double get_cpu_util();
void update_cpu_util();
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
index 00032e8b..bd8650a4 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
@@ -156,6 +156,31 @@ RXLatency::reset_stats() {
}
}
+
+void
+RXLatency::get_stats(rx_per_flow_t *rx_stats,
+ int min,
+ int max,
+ bool reset,
+ TrexPlatformApi::driver_stat_cap_e type) {
+
+ for (int hw_id = min; hw_id <= max; hw_id++) {
+ if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
+ rx_stats[hw_id - min] = m_rx_pg_stat_payload[hw_id];
+ } else {
+ rx_stats[hw_id - min] = m_rx_pg_stat[hw_id];
+ }
+ if (reset) {
+ if (type == TrexPlatformApi::IF_STAT_PAYLOAD) {
+ m_rx_pg_stat_payload[hw_id].clear();
+ } else {
+ m_rx_pg_stat[hw_id].clear();
+ }
+ }
+ }
+}
+
+
Json::Value
RXLatency::to_json() const {
return Json::objectValue;
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h
index bc34b5aa..6af90f8b 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.h
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h
@@ -43,12 +43,18 @@ public:
void create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs);
- void reset_stats();
-
void handle_pkt(const rte_mbuf_t *m);
Json::Value to_json() const;
+ void get_stats(rx_per_flow_t *rx_stats,
+ int min,
+ int max,
+ bool reset,
+ TrexPlatformApi::driver_stat_cap_e type);
+
+ void reset_stats();
+
private:
bool is_flow_stat_id(uint32_t id) {
if ((id & 0x000fff00) == IP_ID_RESERVE_BASE) return true;
@@ -276,6 +282,15 @@ public:
m_latency.reset_stats();
}
+ void get_latency_stats(rx_per_flow_t *rx_stats,
+ int min,
+ int max,
+ bool reset,
+ TrexPlatformApi::driver_stat_cap_e type) {
+
+ return m_latency.get_stats(rx_stats, min, max, reset, type);
+ }
+
RXLatency & get_latency() {
return m_latency;
}