summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/bp_sim.cpp7
-rwxr-xr-xsrc/bp_sim.h5
-rw-r--r--src/debug.cpp12
-rw-r--r--src/flow_stat.cpp234
-rw-r--r--src/flow_stat.h27
-rw-r--r--src/flow_stat_parser.cpp52
-rw-r--r--src/flow_stat_parser.h36
-rw-r--r--src/gtest/trex_stateless_gtest.cpp2
-rw-r--r--src/internal_api/trex_platform_api.h13
-rw-r--r--src/latency.cpp11
-rw-r--r--src/latency.h4
-rw-r--r--src/main_dpdk.cpp377
-rw-r--r--src/main_dpdk.h10
-rwxr-xr-xsrc/msg_manager.cpp17
-rwxr-xr-xsrc/msg_manager.h26
-rw-r--r--src/stateless/cp/trex_stateless.cpp1
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp7
-rw-r--r--src/stateless/cp/trex_stateless_port.h100
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp3
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp16
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp46
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h80
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp217
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h80
24 files changed, 1022 insertions, 361 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
index 6ea40be2..cc9af837 100755
--- a/src/bp_sim.cpp
+++ b/src/bp_sim.cpp
@@ -6039,8 +6039,13 @@ uint16_t CSimplePacketParser::getPktSize(){
return ( ip_len +m_vlan_offset+14);
}
+uint16_t CSimplePacketParser::getIpId() {
+ if (m_ipv4) {
+ return ( m_ipv4->getId() );
+ }
-
+ return (0);
+}
uint8_t CSimplePacketParser::getTTl(){
if (m_ipv4) {
diff --git a/src/bp_sim.h b/src/bp_sim.h
index 37ed7854..4b1a88e3 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -1246,9 +1246,10 @@ static inline int get_is_rx_check_mode(){
return (CGlobalInfo::m_options.preview.get_is_rx_check_enable() ?1:0);
}
-static inline bool get_is_rx_filter_enable(){//???
+static inline bool get_is_rx_filter_enable(){
uint32_t latency_rate=CGlobalInfo::m_options.m_latency_rate;
- return ( ( get_is_rx_check_mode() || CGlobalInfo::is_learn_mode() || latency_rate != 0) ?true:false );
+ return ( ( get_is_rx_check_mode() || CGlobalInfo::is_learn_mode() || latency_rate != 0
+ || get_is_stateless()) ?true:false );
}
static inline uint16_t get_rx_check_hops() {
return (CGlobalInfo::m_options.m_rx_check_hops);
diff --git a/src/debug.cpp b/src/debug.cpp
index 902766a1..656549dc 100644
--- a/src/debug.cpp
+++ b/src/debug.cpp
@@ -415,11 +415,17 @@ int CTrexDebug::test_send(uint pkt_type) {
lp->dump_stats_extended(stdout);
}
for (port_id = 0; port_id < m_max_ports; port_id++) {
- uint64_t fdir_stat[MAX_FLOW_STATS];
+ rx_per_flow_t fdir_stat[MAX_FLOW_STATS];
+ uint64_t fdir_stat_64[MAX_FLOW_STATS];
CPhyEthIF *lp = &m_ports[port_id];
- if (lp->get_flow_stats(fdir_stat, NULL, 0, MAX_FLOW_STATS, false) == 0)
- rte_stat_dump_array(fdir_stat, "FDIR stat", MAX_FLOW_STATS);
+ if (lp->get_flow_stats(fdir_stat, NULL, 0, MAX_FLOW_STATS, false) == 0) {
+ for (int i = 0; i < MAX_FLOW_STATS; i++) {
+ fdir_stat_64[i] = fdir_stat[i].get_pkts();
+ }
+ rte_stat_dump_array(fdir_stat_64, "FDIR stat", MAX_FLOW_STATS);
+ }
}
+
return (0);
}
diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp
index f03065d2..43bde08b 100644
--- a/src/flow_stat.cpp
+++ b/src/flow_stat.cpp
@@ -25,6 +25,7 @@
#include <os_time.h>
#include "internal_api/trex_platform_api.h"
#include "trex_stateless.h"
+#include "trex_stateless_messaging.h"
#include "trex_stream.h"
#include "flow_stat_parser.h"
#include "flow_stat.h"
@@ -33,7 +34,6 @@
#define FLOW_STAT_ADD_ALL_PORTS 255
static const uint16_t FREE_HW_ID = UINT16_MAX;
-static bool no_stat_supported = true;
inline std::string methodName(const std::string& prettyFunction)
{
@@ -106,7 +106,7 @@ int CFlowStatUserIdInfo::add_stream(uint8_t proto) {
#endif
if (proto != m_proto)
- return -1;
+ throw TrexException("Can't use same pg_id for streams with different l4 protocol");
m_ref_count++;
@@ -121,7 +121,7 @@ void CFlowStatUserIdInfo::reset_hw_id() {
// Next session will start counting from 0.
for (int i = 0; i < TREX_MAX_PORTS; i++) {
m_rx_counter_base[i] += m_rx_counter[i];
- m_rx_counter[i] = 0;
+ memset(&m_rx_counter[i], 0, sizeof(m_rx_counter[0]));
m_tx_counter_base[i] += m_tx_counter[i];
memset(&m_tx_counter[i], 0, sizeof(m_tx_counter[0]));
}
@@ -197,7 +197,7 @@ int CFlowStatUserIdMap::add_stream(uint32_t user_id, uint8_t proto) {
if (! c_user_id) {
c_user_id = add_user_id(user_id, proto);
if (! c_user_id)
- return -1;
+ throw TrexException("Failed adding statistic counter - Failure in add_stream");
return 0;
} else {
return c_user_id->add_stream(proto);
@@ -213,7 +213,7 @@ int CFlowStatUserIdMap::del_stream(uint32_t user_id) {
c_user_id = find_user_id(user_id);
if (! c_user_id) {
- return -1;
+ throw TrexException("Trying to delete stream which does not exist");
}
if (c_user_id->del_stream() == 0) {
@@ -236,13 +236,13 @@ int CFlowStatUserIdMap::start_stream(uint32_t user_id, uint16_t hw_id) {
if (! c_user_id) {
fprintf(stderr, "%s Error: Trying to associate hw id %d to user_id %d but it does not exist\n"
, __func__, hw_id, user_id);
- return -1;
+ throw TrexException("Internal error: Trying to associate non exist group id");
}
if (c_user_id->is_hw_id()) {
- fprintf(stderr, "%s Error: Trying to associate hw id %d to user_id %d but it is already associate to %u\n"
+ fprintf(stderr, "%s Error: Trying to associate hw id %d to user_id %d but it is already associated to %u\n"
, __func__, hw_id, user_id, c_user_id->get_hw_id());
- return -1;
+ throw TrexException("Internal error: Trying to associate used packet group id to different hardware counter");
}
c_user_id->set_hw_id(hw_id);
c_user_id->add_started_stream();
@@ -259,9 +259,9 @@ int CFlowStatUserIdMap::start_stream(uint32_t user_id) {
c_user_id = find_user_id(user_id);
if (! c_user_id) {
- fprintf(stderr, "%s Error: Trying to start stream on user_id %d but it does not exist\n"
+ fprintf(stderr, "%s Error: Trying to start stream on pg_id %d but it does not exist\n"
, __func__, user_id);
- return -1;
+ throw TrexException("Trying to start stream with non exist packet group id");
}
c_user_id->add_started_stream();
@@ -280,9 +280,9 @@ int CFlowStatUserIdMap::stop_stream(uint32_t user_id) {
c_user_id = find_user_id(user_id);
if (! c_user_id) {
- fprintf(stderr, "%s Error: Trying to stop stream on user_id %d but it does not exist\n"
+ fprintf(stderr, "%s Error: Trying to stop stream on pg_id %d but it does not exist\n"
, __func__, user_id);
- return -1;
+ throw TrexException("Trying to stop stream with non exist packet group id");
}
return c_user_id->stop_started_stream();
@@ -385,6 +385,34 @@ void CFlowStatHwIdMap::unmap(uint16_t hw_id) {
CFlowStatRuleMgr::CFlowStatRuleMgr() {
m_api = NULL;
m_max_hw_id = -1;
+ m_num_started_streams = 0;
+ m_ring_to_rx = NULL;
+ m_capabilities = 0;
+ m_parser = NULL;
+}
+
+CFlowStatRuleMgr::~CFlowStatRuleMgr() {
+ if (m_parser)
+ delete m_parser;
+}
+
+void CFlowStatRuleMgr::create() {
+ uint16_t num_counters, capabilities;
+ TrexStateless *tstateless = get_stateless_obj();
+ assert(tstateless);
+
+ m_api = tstateless->get_platform_api();
+ assert(m_api);
+ m_api->get_interface_stat_info(0, num_counters, capabilities);
+ m_api->get_port_num(m_num_ports);
+ for (uint8_t port = 0; port < m_num_ports; port++) {
+ assert(m_api->reset_hw_flow_stats(port) == 0);
+ }
+ m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+ assert(m_ring_to_rx);
+ m_parser = m_api->get_flow_stat_parser();
+ assert(m_parser);
+ m_capabilities = capabilities;
}
std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) {
@@ -394,38 +422,30 @@ std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) {
return os;
}
-int CFlowStatRuleMgr::compile_stream(const TrexStream * stream, Cxl710Parser &parser) {
+int CFlowStatRuleMgr::compile_stream(const TrexStream * stream, CFlowStatParser *parser) {
#ifdef __DEBUG_FUNC_ENTRY__
std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << " en:";
std::cout << stream->m_rx_check.m_enabled << std::endl;
#endif
- // currently we support only IP ID rule types
- // all our ports are the same type, so testing port 0 is enough
- uint16_t num_counters, capabilities;
- m_api->get_interface_stat_info(0, num_counters, capabilities);
- if ((capabilities & TrexPlatformApi::IF_STAT_IPV4_ID) == 0) {
- return -2;
- }
-
- if (parser.parse(stream->m_pkt.binary, stream->m_pkt.len) != 0) {
+ if (parser->parse(stream->m_pkt.binary, stream->m_pkt.len) != 0) {
// if we could not parse the packet, but no stat count needed, it is probably OK.
if (stream->m_rx_check.m_enabled) {
fprintf(stderr, "Error: %s - Compilation failed\n", __func__);
- return -1;
+ throw TrexException("Failed parsing given packet for flow stat. Probably bad packet format.");
} else {
return 0;
}
}
- if (!parser.is_fdir_supported()) {
+ if (!parser->is_stat_supported()) {
if (stream->m_stream_id <= 0) {
- // rx stat not needed. Do nothing.
+ // flow stat not needed. Do nothing.
return 0;
} else {
- // rx stat needed, but packet format is not supported
- fprintf(stderr, "Error: %s - Unsupported packet format for rx stat\n", __func__);
- return -1;
+ // flow stat needed, but packet format is not supported
+ fprintf(stderr, "Error: %s - Unsupported packet format for flow stat\n", __func__);
+ throw TrexException("Unsupported packet format for flow stat on given interface type");
}
}
return 0;
@@ -436,44 +456,36 @@ int CFlowStatRuleMgr::add_stream(const TrexStream * stream) {
std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
#endif
- if (! m_api ) {
- TrexStateless *tstateless = get_stateless_obj();
- m_api = tstateless->get_platform_api();
- uint16_t num_counters, capabilities;
- m_api->get_interface_stat_info(0, num_counters, capabilities);
- if ((capabilities & TrexPlatformApi::IF_STAT_IPV4_ID) == 0) {
- // All our interfaces are from the same type. If statistics not supported.
- // no operation will work
- return -1;
- } else {
- no_stat_supported = false;
- }
- m_api->get_port_num(m_num_ports);
- for (uint8_t port = 0; port < m_num_ports; port++) {
- assert(m_api->reset_hw_flow_stats(port) == 0);
- }
+ if (! stream->m_rx_check.m_enabled) {
+ return 0;
}
- if (no_stat_supported)
- return -ENOTSUP;
+ // Init everything here, and not in the constructor, since we relay on other objects
+ // By the time a stream is added everything else is initialized.
+ if (! m_api ) {
+ create();
+ }
- Cxl710Parser parser;
- int ret;
+ uint16_t rule_type = TrexPlatformApi::IF_STAT_IPV4_ID; // In the future need to get it from the stream;
- if (! stream->m_rx_check.m_enabled) {
- return 0;
+ if ((m_capabilities & rule_type) == 0) {
+ fprintf(stderr, "Error: %s - rule type not supported by interface\n", __func__);
+ throw TrexException("Interface does not support given rule type");
}
- if ((ret = compile_stream(stream, parser)) < 0)
- return ret;
+ // compile_stream throws exception if something goes wrong
+ compile_stream(stream, m_parser);
uint8_t l4_proto;
- if (parser.get_l4_proto(l4_proto) < 0) {
- printf("Error: %s failed finding l4 proto\n", __func__);
- return -1;
+ if (m_parser->get_l4_proto(l4_proto) < 0) {
+ fprintf(stderr, "Error: %s failed finding l4 proto\n", __func__);
+ throw TrexException("Failed determining l4 proto for packet");
}
- return m_user_id_map.add_stream(stream->m_rx_check.m_pg_id, l4_proto);
+ // throws exception if there is error
+ m_user_id_map.add_stream(stream->m_rx_check.m_pg_id, l4_proto);
+
+ return 0;
}
int CFlowStatRuleMgr::del_stream(const TrexStream * stream) {
@@ -481,14 +493,23 @@ int CFlowStatRuleMgr::del_stream(const TrexStream * stream) {
std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
#endif
- if (no_stat_supported)
- return -ENOTSUP;
+ if (! m_api)
+ throw TrexException("Called del_stream, but no stream was added");
if (! stream->m_rx_check.m_enabled) {
return 0;
}
- return m_user_id_map.del_stream(stream->m_rx_check.m_pg_id);
+ if (m_user_id_map.is_started(stream->m_rx_check.m_pg_id)) {
+ std::cerr << "Error: Trying to delete flow statistics stream " << stream->m_rx_check.m_pg_id
+ << " which is not stopped." << std::endl;
+ throw TrexException("Trying to delete stream which was not stopped");
+ }
+
+ // Throws exception in case of error
+ m_user_id_map.del_stream(stream->m_rx_check.m_pg_id);
+
+ return 0;
}
// called on all streams, when stream start to transmit
@@ -502,33 +523,49 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream, uint16_t &ret_hw_id) {
std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
#endif
- Cxl710Parser parser;
int ret;
-
- if (no_stat_supported)
- return -ENOTSUP;
-
- if ((ret = compile_stream(stream, parser)) < 0)
- return ret;
+ // Streams which does not need statistics might be started, before any stream that do
+ // need statistcs, so start_stream might be called before add_stream
+ if (! m_api ) {
+ create();
+ }
// first handle streams that do not need rx stat
if (! stream->m_rx_check.m_enabled) {
- // no need for stat count
+ try {
+ compile_stream(stream, m_parser);
+ } catch (TrexException) {
+ // If no statistics needed, and we can't parse the stream, that's OK.
+ return 0;
+ }
+
uint16_t ip_id;
- if (parser.get_ip_id(ip_id) < 0) {
- return 0; // if we could not find and ip id, no need to fix
+ if (m_parser->get_ip_id(ip_id) < 0) {
+ return 0; // if we could not find the ip id, no need to fix
}
// verify no reserved IP_ID used, and change if needed
if (ip_id >= IP_ID_RESERVE_BASE) {
- if (parser.set_ip_id(ip_id & 0xefff) < 0) {
- return -1;
+ if (m_parser->set_ip_id(ip_id & 0xefff) < 0) {
+ throw TrexException("Stream IP ID in reserved range. Failed changing it");
}
}
return 0;
}
- uint16_t hw_id;
// from here, we know the stream need rx stat
+
+ // compile_stream throws exception if something goes wrong
+ if ((ret = compile_stream(stream, m_parser)) < 0)
+ return ret;
+
+ uint16_t hw_id;
+ uint16_t rule_type = TrexPlatformApi::IF_STAT_IPV4_ID; // In the future, need to get it from the stream;
+
+ if ((m_capabilities & rule_type) == 0) {
+ fprintf(stderr, "Error: %s - rule type not supported by interface\n", __func__);
+ throw TrexException("Interface does not support given rule type");
+ }
+
if (m_user_id_map.is_started(stream->m_rx_check.m_pg_id)) {
m_user_id_map.start_stream(stream->m_rx_check.m_pg_id); // just increase ref count;
hw_id = m_user_id_map.get_hw_id(stream->m_rx_check.m_pg_id); // can't fail if we got here
@@ -536,19 +573,19 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream, uint16_t &ret_hw_id) {
hw_id = m_hw_id_map.find_free_hw_id();
if (hw_id == FREE_HW_ID) {
printf("Error: %s failed finding free hw_id\n", __func__);
- return -1;
+ throw TrexException("Failed allocating statistic counter. Probably all are used.");
} else {
if (hw_id > m_max_hw_id) {
m_max_hw_id = hw_id;
}
uint32_t user_id = stream->m_rx_check.m_pg_id;
- m_user_id_map.start_stream(user_id, hw_id);
+ m_user_id_map.start_stream(user_id, hw_id); // ??? can throw exception. return hw_id
m_hw_id_map.map(hw_id, user_id);
add_hw_rule(hw_id, m_user_id_map.l4_proto(user_id));
}
}
- parser.set_ip_id(IP_ID_RESERVE_BASE + hw_id);
+ m_parser->set_ip_id(IP_ID_RESERVE_BASE + hw_id);
ret_hw_id = hw_id;
@@ -556,6 +593,10 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream, uint16_t &ret_hw_id) {
std::cout << "exit:" << __METHOD_NAME__ << " hw_id:" << ret_hw_id << std::endl;
#endif
+ if (m_num_started_streams == 0) {
+ send_start_stop_msg_to_rx(true); // First transmitting stream. Rx core should start reading packets;
+ }
+ m_num_started_streams++;
return 0;
}
@@ -571,13 +612,13 @@ int CFlowStatRuleMgr::stop_stream(const TrexStream * stream) {
#ifdef __DEBUG_FUNC_ENTRY__
std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
#endif
- if (no_stat_supported)
- return -ENOTSUP;
-
if (! stream->m_rx_check.m_enabled) {
return 0;
}
+ if (! m_api)
+ throw TrexException("Called stop_stream, but no stream was added");
+
if (m_user_id_map.stop_stream(stream->m_rx_check.m_pg_id) == 0) {
// last stream associated with the entry stopped transmittig.
// remove user_id <--> hw_id mapping
@@ -585,12 +626,12 @@ int CFlowStatRuleMgr::stop_stream(const TrexStream * stream) {
uint16_t hw_id = m_user_id_map.get_hw_id(stream->m_rx_check.m_pg_id);
if (hw_id >= MAX_FLOW_STATS) {
fprintf(stderr, "Error: %s got wrong hw_id %d from unmap\n", __func__, hw_id);
- return -1;
+ throw TrexException("Internal error in stop_stream. Got bad hw_id");
} else {
// update counters, and reset before unmapping
CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(hw_id));
assert(p_user_id != NULL);
- uint64_t rx_counter;
+ rx_per_flow_t rx_counter;
tx_per_flow_t tx_counter;
for (uint8_t port = 0; port < m_num_ports; port++) {
m_api->del_rx_flow_stat_rule(port, FLOW_STAT_RULE_TYPE_IPV4_ID, proto, hw_id);
@@ -605,6 +646,11 @@ int CFlowStatRuleMgr::stop_stream(const TrexStream * stream) {
m_hw_id_map.unmap(hw_id);
}
}
+ m_num_started_streams--;
+ assert (m_num_started_streams >= 0);
+ if (m_num_started_streams == 0) {
+ send_start_stop_msg_to_rx(false); // No more transmittig streams. Rx core shoulde get into idle loop.
+ }
return 0;
}
@@ -618,16 +664,28 @@ int CFlowStatRuleMgr::get_active_pgids(flow_stat_active_t &result) {
return 0;
}
+extern bool rx_should_stop;
+void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) {
+ TrexStatelessCpToRxMsgBase *msg;
+
+ if (is_start) {
+ msg = new TrexStatelessRxStartMsg();
+ } else {
+ msg = new TrexStatelessRxStopMsg();
+ }
+ m_ring_to_rx->Enqueue((CGenNode *)msg);
+}
+
// return false if no counters changed since last run. true otherwise
bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) {
- uint64_t rx_stats[MAX_FLOW_STATS];
+ rx_per_flow_t rx_stats[MAX_FLOW_STATS];
tx_per_flow_t tx_stats[MAX_FLOW_STATS];
Json::FastWriter writer;
Json::Value root;
root["name"] = "flow_stats";
root["type"] = 0;
-
+
if (baseline) {
root["baseline"] = true;
}
@@ -645,15 +703,16 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) {
for (uint8_t port = 0; port < m_num_ports; port++) {
m_api->get_flow_stats(port, rx_stats, (void *)tx_stats, 0, m_max_hw_id, false);
for (int i = 0; i <= m_max_hw_id; i++) {
- if (rx_stats[i] != 0) {
+ if (rx_stats[i].get_pkts() != 0) {
+ rx_per_flow_t rx_pkts = rx_stats[i];
CFlowStatUserIdInfo *p_user_id = m_user_id_map.find_user_id(m_hw_id_map.get_user_id(i));
if (likely(p_user_id != NULL)) {
- if (p_user_id->get_rx_counter(port) != rx_stats[i]) {
- p_user_id->set_rx_counter(port, rx_stats[i]);
+ if (p_user_id->get_rx_counter(port) != rx_pkts) {
+ p_user_id->set_rx_counter(port, rx_pkts);
p_user_id->set_need_to_send_rx(port);
}
} else {
- std::cerr << __METHOD_NAME__ << i << ":Could not count " << rx_stats[i] << " rx packets, on port "
+ std::cerr << __METHOD_NAME__ << i << ":Could not count " << rx_pkts << " rx packets, on port "
<< (uint16_t)port << ", because no mapping was found." << std::endl;
}
}
@@ -690,7 +749,8 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) {
std::string str_port = static_cast<std::ostringstream*>( &(std::ostringstream() << int(port) ) )->str();
if (user_id_info->need_to_send_rx(port) || baseline) {
user_id_info->set_no_need_to_send_rx(port);
- data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_counter(port));
+ data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_counter(port).get_pkts());
+ data_section[str_user_id]["rx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_rx_counter(port).get_bytes());
send_empty = false;
}
if (user_id_info->need_to_send_tx(port) || baseline) {
diff --git a/src/flow_stat.h b/src/flow_stat.h
index 3e00a180..ea33062d 100644
--- a/src/flow_stat.h
+++ b/src/flow_stat.h
@@ -26,6 +26,7 @@
#include <map>
#include "trex_defs.h"
#include "trex_stream.h"
+#include "msg_manager.h"
#include <internal_api/trex_platform_api.h>
// range reserved for rx stat measurement is from IP_ID_RESERVE_BASE to 0xffff
@@ -50,7 +51,7 @@ class tx_per_flow_t_ {
inline void set_bytes(uint64_t bytes) {
m_bytes = bytes;;
}
- inline void get_pkts(uint64_t pkts) {
+ inline void set_pkts(uint64_t pkts) {
m_pkts = pkts;
}
inline void add_bytes(uint64_t bytes) {
@@ -100,16 +101,17 @@ class tx_per_flow_t_ {
};
typedef class tx_per_flow_t_ tx_per_flow_t;
+typedef class tx_per_flow_t_ rx_per_flow_t;
class CPhyEthIF;
-class Cxl710Parser;
+class CFlowStatParser;
class CFlowStatUserIdInfo {
public:
CFlowStatUserIdInfo(uint8_t proto);
friend std::ostream& operator<<(std::ostream& os, const CFlowStatUserIdInfo& cf);
- void set_rx_counter(uint8_t port, uint64_t val) {m_rx_counter[port] = val;}
- uint64_t get_rx_counter(uint8_t port) {return m_rx_counter[port] + m_rx_counter_base[port];}
+ void set_rx_counter(uint8_t port, rx_per_flow_t val) {m_rx_counter[port] = val;}
+ rx_per_flow_t get_rx_counter(uint8_t port) {return m_rx_counter[port] + m_rx_counter_base[port];}
void set_tx_counter(uint8_t port, tx_per_flow_t val) {m_tx_counter[port] = val;}
tx_per_flow_t get_tx_counter(uint8_t port) {return m_tx_counter[port] + m_tx_counter_base[port];}
void set_hw_id(uint16_t hw_id) {m_hw_id = hw_id;}
@@ -135,16 +137,16 @@ class CFlowStatUserIdInfo {
private:
bool m_rx_changed[TREX_MAX_PORTS]; // Which RX counters changed since we last published
bool m_tx_changed[TREX_MAX_PORTS]; // Which TX counters changed since we last published
- uint64_t m_rx_counter[TREX_MAX_PORTS]; // How many packets received with this user id since stream start
+ rx_per_flow_t m_rx_counter[TREX_MAX_PORTS]; // How many packets received with this user id since stream start
// How many packets received with this user id, since stream creation, before stream start.
- uint64_t m_rx_counter_base[TREX_MAX_PORTS];
+ rx_per_flow_t m_rx_counter_base[TREX_MAX_PORTS];
tx_per_flow_t m_tx_counter[TREX_MAX_PORTS]; // How many packets transmitted with this user id since stream start
// How many packets transmitted with this user id, since stream creation, before stream start.
tx_per_flow_t m_tx_counter_base[TREX_MAX_PORTS];
uint16_t m_hw_id; // Associated hw id. UINT16_MAX if no associated hw id.
uint8_t m_proto; // protocol (UDP, TCP, other), associated with this user id.
- uint8_t m_ref_count; // How many streams with this ref count exists
- uint8_t m_trans_ref_count; // How many streams with this ref count currently transmit
+ uint8_t m_ref_count; // How many streams with this user id exists
+ uint8_t m_trans_ref_count; // How many streams with this user id currently transmit
bool m_was_sent; // Did we send this info to clients once?
};
@@ -196,6 +198,7 @@ class CFlowStatRuleMgr {
};
CFlowStatRuleMgr();
+ ~CFlowStatRuleMgr();
friend std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf);
int add_stream(const TrexStream * stream);
int del_stream(const TrexStream * stream);
@@ -205,8 +208,10 @@ class CFlowStatRuleMgr {
bool dump_json(std::string & json, bool baseline);
private:
- int compile_stream(const TrexStream * stream, Cxl710Parser &parser);
+ void create();
+ int compile_stream(const TrexStream * stream, CFlowStatParser *parser);
int add_hw_rule(uint16_t hw_id, uint8_t proto);
+ void send_start_stop_msg_to_rx(bool is_start);
private:
CFlowStatHwIdMap m_hw_id_map; // map hw ids to user ids
@@ -214,6 +219,10 @@ class CFlowStatRuleMgr {
uint8_t m_num_ports; // How many ports are being used
const TrexPlatformApi *m_api;
int m_max_hw_id; // max hw id we ever used
+ uint32_t m_num_started_streams; // How many started (transmitting) streams we have
+ CNodeRing *m_ring_to_rx; // handle for sending messages to Rx core
+ CFlowStatParser *m_parser;
+ uint16_t m_capabilities;
};
#endif
diff --git a/src/flow_stat_parser.cpp b/src/flow_stat_parser.cpp
index 52824f73..8a77c82d 100644
--- a/src/flow_stat_parser.cpp
+++ b/src/flow_stat_parser.cpp
@@ -25,38 +25,36 @@
#include <common/Network/Packet/EthernetHeader.h>
#include <flow_stat_parser.h>
-Cxl710Parser::Cxl710Parser() {
- reset();
-}
-
-void Cxl710Parser::reset() {
+void CFlowStatParser::reset() {
m_ipv4 = 0;
m_l4_proto = 0;
- m_fdir_supported = false;
+ m_stat_supported = false;
}
-int Cxl710Parser::parse(uint8_t *p, uint16_t len) {
+int CFlowStatParser::parse(uint8_t *p, uint16_t len) {
EthernetHeader *ether = (EthernetHeader *)p;
+ reset();
+
switch( ether->getNextProtocol() ) {
case EthernetHeader::Protocol::IP :
m_ipv4 = (IPHeader *)(p + 14);
- m_fdir_supported = true;
+ m_stat_supported = true;
break;
case EthernetHeader::Protocol::VLAN :
switch ( ether->getVlanProtocol() ){
case EthernetHeader::Protocol::IP:
m_ipv4 = (IPHeader *)(p + 18);
- m_fdir_supported = true;
+ m_stat_supported = true;
break;
default:
- m_fdir_supported = false;
+ m_stat_supported = false;
return -1;
}
break;
default:
- m_fdir_supported = false;
+ m_stat_supported = false;
return -1;
break;
}
@@ -64,7 +62,7 @@ int Cxl710Parser::parse(uint8_t *p, uint16_t len) {
return 0;
}
-int Cxl710Parser::get_ip_id(uint16_t &ip_id) {
+int CFlowStatParser::get_ip_id(uint16_t &ip_id) {
if (! m_ipv4)
return -1;
@@ -73,7 +71,7 @@ int Cxl710Parser::get_ip_id(uint16_t &ip_id) {
return 0;
}
-int Cxl710Parser::set_ip_id(uint16_t new_id) {
+int CFlowStatParser::set_ip_id(uint16_t new_id) {
if (! m_ipv4)
return -1;
@@ -84,7 +82,7 @@ int Cxl710Parser::set_ip_id(uint16_t new_id) {
return 0;
}
-int Cxl710Parser::get_l4_proto(uint8_t &proto) {
+int CFlowStatParser::get_l4_proto(uint8_t &proto) {
if (! m_ipv4)
return -1;
@@ -96,7 +94,7 @@ int Cxl710Parser::get_l4_proto(uint8_t &proto) {
static const uint16_t TEST_IP_ID = 0xabcd;
static const uint8_t TEST_L4_PROTO = 0x11;
-int Cxl710Parser::test() {
+int CFlowStatParser::test() {
uint16_t ip_id = 0;
uint8_t l4_proto;
uint8_t test_pkt[] = {
@@ -124,14 +122,34 @@ int Cxl710Parser::test() {
assert(m_ipv4->isChecksumOK() == true);
assert(get_l4_proto(l4_proto) == 0);
assert(l4_proto == TEST_L4_PROTO);
- assert(m_fdir_supported == true);
+ assert(m_stat_supported == true);
reset();
// bad packet
test_pkt[16] = 0xaa;
assert (parse(test_pkt, sizeof(test_pkt)) == -1);
- assert(m_fdir_supported == false);
+ assert(m_stat_supported == false);
+
+ return 0;
+}
+
+// In 82599 10G card we do not support VLANs
+int C82599Parser::parse(uint8_t *p, uint16_t len) {
+ EthernetHeader *ether = (EthernetHeader *)p;
+
+ reset();
+
+ switch( ether->getNextProtocol() ) {
+ case EthernetHeader::Protocol::IP :
+ m_ipv4 = (IPHeader *)(p + 14);
+ m_stat_supported = true;
+ break;
+ default:
+ m_stat_supported = false;
+ return -1;
+ break;
+ }
return 0;
}
diff --git a/src/flow_stat_parser.h b/src/flow_stat_parser.h
index 606a1bec..8c9e1418 100644
--- a/src/flow_stat_parser.h
+++ b/src/flow_stat_parser.h
@@ -19,19 +19,33 @@
limitations under the License.
*/
-class Cxl710Parser {
+#ifndef __FLOW_STAT_PARSER_H__
+#define __FLOW_STAT_PARSER_H__
+
+// Basic flow stat parser. Relevant for xl710/x710/x350 cards
+#include "common/Network/Packet/IPHeader.h"
+
+class CFlowStatParser {
public:
- Cxl710Parser();
- void reset();
- int parse(uint8_t *pkt, uint16_t len);
- bool is_fdir_supported() {return m_fdir_supported == true;};
- int get_ip_id(uint16_t &ip_id);
- int set_ip_id(uint16_t ip_id);
- int get_l4_proto(uint8_t &proto);
- int test();
+ virtual ~CFlowStatParser() {};
+ virtual void reset();
+ virtual int parse(uint8_t *pkt, uint16_t len);
+ virtual bool is_stat_supported() {return m_stat_supported == true;};
+ virtual int get_ip_id(uint16_t &ip_id);
+ virtual int set_ip_id(uint16_t ip_id);
+ virtual int get_l4_proto(uint8_t &proto);
+ virtual int test();
- private:
+ protected:
IPHeader *m_ipv4;
- bool m_fdir_supported;
+ bool m_stat_supported;
uint8_t m_l4_proto;
};
+
+class C82599Parser : public CFlowStatParser {
+ public:
+ ~C82599Parser() {};
+ int parse(uint8_t *pkt, uint16_t len);
+};
+
+#endif
diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp
index c3dfcb95..a5cf3307 100644
--- a/src/gtest/trex_stateless_gtest.cpp
+++ b/src/gtest/trex_stateless_gtest.cpp
@@ -3581,7 +3581,7 @@ class rx_stat_pkt_parse : public testing::Test {
TEST_F(rx_stat_pkt_parse, x710_parser) {
- Cxl710Parser parser;
+ CFlowStatParser parser;
parser.test();
}
diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h
index f8f76584..90eaa7c7 100644
--- a/src/internal_api/trex_platform_api.h
+++ b/src/internal_api/trex_platform_api.h
@@ -26,6 +26,7 @@ limitations under the License.
#include <vector>
#include <string>
#include <string.h>
+#include "flow_stat_parser.h"
#include "trex_defs.h"
/**
@@ -34,6 +35,7 @@ limitations under the License.
* @author imarom (06-Oct-15)
*/
+
class TrexPlatformGlobalStats {
public:
TrexPlatformGlobalStats() {
@@ -42,7 +44,7 @@ public:
struct {
double m_cpu_util;
-
+ double m_rx_cpu_util;
double m_tx_bps;
double m_rx_bps;
@@ -142,7 +144,7 @@ public:
virtual void publish_async_data_now(uint32_t key, bool baseline) const = 0;
virtual uint8_t get_dp_core_count() const = 0;
virtual void get_interface_stat_info(uint8_t interface_id, uint16_t &num_counters, uint16_t &capabilities) const =0;
- virtual int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const = 0;
+ virtual int get_flow_stats(uint8_t port_id, void *stats, void *tx_stats, int min, int max, bool reset) const = 0;
virtual int reset_hw_flow_stats(uint8_t port_id) const = 0;
virtual void get_port_num(uint8_t &port_num) const = 0;
virtual int add_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const = 0;
@@ -151,6 +153,7 @@ public:
virtual bool get_promiscuous(uint8_t port_id) const = 0;
virtual void flush_dp_messages() const = 0;
virtual int get_active_pgids(flow_stat_active_t &result) const = 0;
+ virtual CFlowStatParser *get_flow_stat_parser() const = 0;
virtual ~TrexPlatformApi() {}
};
@@ -171,7 +174,7 @@ public:
void publish_async_data_now(uint32_t key, bool baseline) const;
uint8_t get_dp_core_count() const;
void get_interface_stat_info(uint8_t interface_id, uint16_t &num_counters, uint16_t &capabilities) const;
- int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const;
+ int get_flow_stats(uint8_t port_id, void *stats, void *tx_stats, int min, int max, bool reset) const;
int reset_hw_flow_stats(uint8_t port_id) const;
void get_port_num(uint8_t &port_num) const;
int add_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const;
@@ -180,6 +183,7 @@ public:
bool get_promiscuous(uint8_t port_id) const;
void flush_dp_messages() const;
int get_active_pgids(flow_stat_active_t &result) const;
+ CFlowStatParser *get_flow_stat_parser() const;
};
@@ -225,7 +229,7 @@ public:
virtual void publish_async_data_now(uint32_t key, bool baseline) const {
}
- virtual int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const {return 0;};
+ virtual int get_flow_stats(uint8_t port_id, void *stats, void *tx_stats, int min, int max, bool reset) const {return 0;};
virtual int reset_hw_flow_stats(uint8_t port_id) const {return 0;};
virtual void get_port_num(uint8_t &port_num) const {port_num = 2;};
virtual int add_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const {return 0;}
@@ -241,6 +245,7 @@ public:
void flush_dp_messages() const {
}
int get_active_pgids(flow_stat_active_t &result) const {return 0;}
+ CFlowStatParser *get_flow_stat_parser() const {return new CFlowStatParser();}
private:
int m_dp_core_count;
diff --git a/src/latency.cpp b/src/latency.cpp
index d57e97c8..fff7935d 100644
--- a/src/latency.cpp
+++ b/src/latency.cpp
@@ -177,6 +177,9 @@ void CCPortLatency::reset(){
m_seq_error=0;
m_length_error=0;
m_no_ipv4_option=0;
+ for (int i = 0; i < MAX_FLOW_STATS; i++) {
+ m_rx_pg_stat[i].clear();
+ }
m_hist.Reset();
}
@@ -628,8 +631,8 @@ void CLatencyManager::handle_rx_pkt(CLatencyManagerPerPort * lp,
rte_pktmbuf_free(m);
}
-void CLatencyManager::handle_latency_pkt_msg(uint8_t thread_id,
- CGenNodeLatencyPktInfo * msg){
+// In VM, we receive the RX packets in DP core, and send message to RX core with the packet
+void CLatencyManager::handle_latency_pkt_msg(uint8_t thread_id, CGenNodeLatencyPktInfo * msg) {
assert(msg->m_latency_offset==0xdead);
@@ -666,6 +669,7 @@ void CLatencyManager::run_rx_queue_msgs(uint8_t thread_id,
}
}
+// VM mode function. Handle messages from DP
void CLatencyManager::try_rx_queues(){
CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
@@ -679,7 +683,6 @@ void CLatencyManager::try_rx_queues(){
}
}
-
void CLatencyManager::try_rx(){
rte_mbuf_t * rx_pkts[64];
int i;
@@ -712,7 +715,7 @@ void CLatencyManager::reset(){
}
-void CLatencyManager::start(int iter){
+void CLatencyManager::start(int iter) {
m_do_stop =false;
m_is_active =false;
int cnt=0;
diff --git a/src/latency.h b/src/latency.h
index 1f8ef5c0..3dd1cc36 100644
--- a/src/latency.h
+++ b/src/latency.h
@@ -86,6 +86,7 @@ public:
bool Parse();
uint8_t getTTl();
+ uint16_t getIpId();
uint16_t getPktSize();
// Check if packet contains latency data
@@ -243,9 +244,8 @@ public:
uint64_t m_seq_error;
uint64_t m_rx_check;
uint64_t m_no_ipv4_option;
-
-
uint64_t m_length_error;
+ rx_per_flow_t m_rx_pg_stat[MAX_FLOW_STATS];
CTimeHistogram m_hist; /* all window */
CJitter m_jitter;
};
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 1b750bbd..46e9a95e 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -58,6 +58,7 @@
#include "stateless/cp/trex_stateless.h"
#include "stateless/dp/trex_stream_node.h"
#include "stateless/messaging/trex_stateless_messaging.h"
+#include "stateless/rx/trex_stateless_rx_core.h"
#include "publisher/trex_publisher.h"
#include "../linux_dpdk/version.h"
extern "C" {
@@ -106,7 +107,7 @@ static inline int get_vm_one_queue_enable(){
}
static inline int get_is_rx_thread_enabled() {
- return (CGlobalInfo::m_options.is_rx_enabled() ?1:0);
+ return ((CGlobalInfo::m_options.is_rx_enabled() || CGlobalInfo::m_options.is_stateless()) ?1:0);
}
struct port_cfg_t;
@@ -140,10 +141,14 @@ public:
virtual int wait_for_stable_link()=0;
virtual void wait_after_link_up(){};
virtual bool flow_control_disable_supported(){return true;}
- virtual int get_rx_stats(CPhyEthIF * _if, uint32_t *stats, uint32_t *prev_stats, int min, int max) {return -1;}
+ virtual bool hw_rx_stat_supported(){return false;}
+ virtual int get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts, uint32_t *bytes, uint32_t *prev_bytes
+ , int min, int max) {return -1;}
+ virtual int reset_rx_stats(CPhyEthIF * _if, uint32_t *stats) {return 0;}
virtual int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd) { return -1;}
virtual int get_stat_counters_num() {return 0;}
virtual int get_rx_stat_capabilities() {return 0;}
+ virtual CFlowStatParser *get_flow_stat_parser();
};
@@ -174,8 +179,8 @@ public:
virtual int configure_drop_queue(CPhyEthIF * _if);
virtual int configure_rx_filter_rules(CPhyEthIF * _if);
- int configure_rx_filter_rules_statefull(CPhyEthIF * _if);
- int configure_rx_filter_rules_stateless(CPhyEthIF * _if);
+ virtual int configure_rx_filter_rules_statefull(CPhyEthIF * _if);
+ virtual int configure_rx_filter_rules_stateless(CPhyEthIF * _if);
virtual bool is_hardware_support_drop_queue(){
return(true);
@@ -184,9 +189,11 @@ public:
virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats);
virtual void clear_extended_stats(CPhyEthIF * _if);
-
+ virtual int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd) {return 0;}
+ virtual int get_stat_counters_num() {return MAX_FLOW_STATS;}
+ virtual int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;}
virtual int wait_for_stable_link();
- void wait_after_link_up();
+ virtual void wait_after_link_up();
};
class CTRexExtendedDriverBase1GVm : public CTRexExtendedDriverBase {
@@ -236,6 +243,8 @@ public:
virtual void clear_extended_stats(CPhyEthIF * _if);
virtual int wait_for_stable_link();
+ virtual int get_stat_counters_num() {return MAX_FLOW_STATS;}
+ virtual int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;}
};
@@ -262,15 +271,18 @@ public:
virtual bool is_hardware_filter_is_supported(){
return (true);
}
-
virtual int configure_rx_filter_rules(CPhyEthIF * _if);
-
+ virtual int configure_rx_filter_rules_stateless(CPhyEthIF * _if);
+ virtual int configure_rx_filter_rules_statefull(CPhyEthIF * _if);
virtual bool is_hardware_support_drop_queue(){
return(true);
}
virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats);
virtual void clear_extended_stats(CPhyEthIF * _if);
virtual int wait_for_stable_link();
+ virtual int get_stat_counters_num() {return MAX_FLOW_STATS;}
+ virtual int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;}
+ virtual CFlowStatParser *get_flow_stat_parser();
};
class CTRexExtendedDriverBase40G : public CTRexExtendedDriverBase10G {
@@ -313,16 +325,21 @@ public:
}
virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats);
virtual void clear_extended_stats(CPhyEthIF * _if);
- int get_rx_stats(CPhyEthIF * _if, uint32_t *stats, uint32_t *prev_stats, int min, int max);
- int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd);
- int get_stat_counters_num() {return MAX_FLOW_STATS;}
- int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;}
+ virtual int reset_rx_stats(CPhyEthIF * _if, uint32_t *stats);
+ virtual int get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts, uint32_t *bytes, uint32_t *prev_bytes, int min, int max);
+ virtual int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd);
+ virtual int get_stat_counters_num() {return MAX_FLOW_STATS;}
+ virtual int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;}
virtual int wait_for_stable_link();
// disabling flow control on 40G using DPDK API causes the interface to malfunction
- bool flow_control_disable_supported(){return false;}
+ virtual bool flow_control_disable_supported(){return false;}
+ virtual bool hw_rx_stat_supported(){return true;}
+ virtual CFlowStatParser *get_flow_stat_parser();
+
private:
- void add_del_rules(enum rte_filter_op op, uint8_t port_id, uint16_t type, uint8_t ttl, uint16_t ip_id, int queue, uint16_t stat_idx);
+ virtual void add_del_rules(enum rte_filter_op op, uint8_t port_id, uint16_t type, uint8_t ttl, uint16_t ip_id, int queue, uint16_t stat_idx);
virtual int configure_rx_filter_rules_statfull(CPhyEthIF * _if);
+
private:
uint8_t m_if_per_card;
};
@@ -1019,11 +1036,11 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t
}
if ( (po->is_latency_enabled()) || (po->preview.getOnlyLatency()) ){
- parse_err("Latecny check is not supported with interactive mode ");
+ parse_err("Latency check is not supported with interactive mode ");
}
if ( po->preview.getSingleCore() ){
- parse_err("single core is not supported with interactive mode ");
+ parse_err("Single core is not supported with interactive mode ");
}
}
@@ -1096,15 +1113,19 @@ public:
m_port_conf.fdir_conf.status=RTE_FDIR_NO_REPORT_STATUS;
/* Offset of flexbytes field in RX packets (in 16-bit word units). */
/* Note: divide by 2 to convert byte offset to word offset */
- if ( CGlobalInfo::m_options.preview.get_ipv6_mode_enable() ){
- m_port_conf.fdir_conf.flexbytes_offset=(14+6)/2;
- }else{
- m_port_conf.fdir_conf.flexbytes_offset=(14+8)/2;
- }
+ if (get_is_stateless()) {
+ m_port_conf.fdir_conf.flexbytes_offset = (14+4)/2;
+ } else {
+ if ( CGlobalInfo::m_options.preview.get_ipv6_mode_enable() ) {
+ m_port_conf.fdir_conf.flexbytes_offset = (14+6)/2;
+ } else {
+ m_port_conf.fdir_conf.flexbytes_offset = (14+8)/2;
+ }
- /* Increment offset 4 bytes for the case where we add VLAN */
- if ( CGlobalInfo::m_options.preview.get_vlan_mode_enable() ){
- m_port_conf.fdir_conf.flexbytes_offset+=(4/2);
+ /* Increment offset 4 bytes for the case where we add VLAN */
+ if ( CGlobalInfo::m_options.preview.get_vlan_mode_enable() ) {
+ m_port_conf.fdir_conf.flexbytes_offset += (4/2);
+ }
}
m_port_conf.fdir_conf.drop_queue=1;
}
@@ -1180,7 +1201,8 @@ void CPhyEthIFStats::Clear(){
oerrors = 0;
imcasts = 0;
rx_nombuf = 0;
- memset(m_rx_per_flow, 0, sizeof(m_rx_per_flow));
+ memset(m_rx_per_flow_pkts, 0, sizeof(m_rx_per_flow_pkts));
+ memset(m_rx_per_flow_bytes, 0, sizeof(m_rx_per_flow_bytes));
}
@@ -1214,6 +1236,7 @@ void CPhyEthIFStats::Dump(FILE *fd){
DP_A(rx_nombuf);
}
+// Clear the RX queue of an interface, dropping all packets
void CPhyEthIF::flush_rx_queue(void){
rte_mbuf_t * rx_pkts[32];
@@ -1786,6 +1809,9 @@ bool CCoreEthIF::Create(uint8_t core_id,
return (true);
}
+// This function is only relevant if we are in VM. In this case, we only have one rx queue. Can't have
+// rules to drop queue 0, and pass queue 1 to RX core, like in other cases.
+// We receive all packets in the same core that transmitted, and handle them to RX core.
void CCoreEthIF::flush_rx_queue(void){
pkt_dir_t dir ;
bool is_rx = get_is_rx_thread_enabled();
@@ -2300,6 +2326,7 @@ public:
float m_active_flows;
float m_open_flows;
float m_cpu_util;
+ float m_rx_cpu_util;
uint8_t m_threads;
uint32_t m_num_of_ports;
@@ -2588,15 +2615,17 @@ public:
int queues_prob_init();
int ixgbe_start();
int ixgbe_rx_queue_flush();
- int ixgbe_configure_mg();
+ void ixgbe_configure_mg();
+ void rx_sl_configure();
bool is_all_links_are_up(bool dump=false);
int reset_counters();
private:
- /* try to stop all datapath cores */
- void try_stop_all_dp();
+ /* try to stop all datapath cores and RX core */
+ void try_stop_all_cores();
/* send message to all dp cores */
int send_message_all_dp(TrexStatelessCpToDpMsgBase *msg);
+ int send_message_to_rx(TrexStatelessCpToRxMsgBase *msg);
void check_for_dp_message_from_core(int thread_id);
public:
@@ -2604,7 +2633,6 @@ public:
int start_master_statefull();
int start_master_stateless();
int run_in_core(virtual_thread_id_t virt_core_id);
- int stop_core(virtual_thread_id_t virt_core_id);
int core_for_rx(){
if ( (! get_is_rx_thread_enabled()) ) {
return -1;
@@ -2675,8 +2703,10 @@ public:
CParserOption m_po ;
CFlowGenList m_fl;
bool m_fl_was_init;
- volatile uint8_t m_signal[BP_MAX_CORES] __rte_cache_aligned ;
- CLatencyManager m_mg;
+ volatile uint8_t m_signal[BP_MAX_CORES] __rte_cache_aligned ; // Signal to main core when DP thread finished
+ volatile bool m_rx_running; // Signal main core when RX thread finished
+ CLatencyManager m_mg; // statefull RX core
+ CRxCoreStateless m_rx_sl; // stateless RX core
CTrexGlobalIoMode m_io_modes;
private:
@@ -2763,12 +2793,14 @@ bool CGlobalTRex::is_all_links_are_up(bool dump){
return (all_link_are);
}
+void CGlobalTRex::try_stop_all_cores(){
-void CGlobalTRex::try_stop_all_dp(){
-
- TrexStatelessDpQuit * msg= new TrexStatelessDpQuit();
- send_message_all_dp(msg);
- delete msg;
+ TrexStatelessDpQuit * dp_msg= new TrexStatelessDpQuit();
+ TrexStatelessRxQuit * rx_msg= new TrexStatelessRxQuit();
+ send_message_all_dp(dp_msg);
+ send_message_to_rx(rx_msg);
+ delete dp_msg;
+ // no need to delete rx_msg. Deleted by receiver
bool all_core_finished = false;
int i;
for (i=0; i<20; i++) {
@@ -2799,6 +2831,13 @@ int CGlobalTRex::send_message_all_dp(TrexStatelessCpToDpMsgBase *msg){
return (0);
}
+int CGlobalTRex::send_message_to_rx(TrexStatelessCpToRxMsgBase *msg) {
+ CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+ ring->Enqueue((CGenNode *) msg);
+
+ return (0);
+}
+
int CGlobalTRex::ixgbe_rx_queue_flush(){
int i;
@@ -2810,7 +2849,7 @@ int CGlobalTRex::ixgbe_rx_queue_flush(){
}
-int CGlobalTRex::ixgbe_configure_mg(void){
+void CGlobalTRex::ixgbe_configure_mg(void) {
int i;
CLatencyManagerCfg mg_cfg;
mg_cfg.m_max_ports = m_max_ports;
@@ -2850,10 +2889,34 @@ int CGlobalTRex::ixgbe_configure_mg(void){
m_mg.Create(&mg_cfg);
m_mg.set_mask(CGlobalInfo::m_options.m_latency_mask);
-
- return (0);
}
+// init m_rx_sl object for stateless rx core
+void CGlobalTRex::rx_sl_configure(void) {
+ CRxSlCfg rx_sl_cfg;
+ int i;
+
+ rx_sl_cfg.m_max_ports = m_max_ports;
+
+ if ( get_vm_one_queue_enable() ) {
+ /* vm mode, indirect queues */
+ for (i=0; i < m_max_ports; i++) {
+ CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
+ uint8_t thread_id = (i >> 1);
+ CNodeRing * r = rx_dp->getRingCpToDp(thread_id);
+ m_latency_vm_vports[i].Create((uint8_t)i, r, &m_mg);
+ rx_sl_cfg.m_ports[i] = &m_latency_vm_vports[i];
+ }
+ } else {
+ for (i = 0; i < m_max_ports; i++) {
+ CPhyEthIF * _if = &m_ports[i];
+ m_latency_vports[i].Create(_if, m_latency_tx_queue_id, 1);
+ rx_sl_cfg.m_ports[i] = &m_latency_vports[i];
+ }
+ }
+
+ m_rx_sl.create(rx_sl_cfg);
+}
int CGlobalTRex::ixgbe_start(void){
int i;
@@ -2971,8 +3034,11 @@ int CGlobalTRex::ixgbe_start(void){
ixgbe_rx_queue_flush();
-
- ixgbe_configure_mg();
+ if (! get_is_stateless()) {
+ ixgbe_configure_mg();
+ } else {
+ rx_sl_configure();
+ }
/* core 0 - control
@@ -3361,6 +3427,9 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){
stats.m_num_of_ports = m_max_ports;
stats.m_cpu_util = m_fl.GetCpuUtil();
+ if (get_is_stateless()) {
+ stats.m_rx_cpu_util = m_rx_sl.get_cpu_util();
+ }
stats.m_threads = m_fl.m_threads_info.size();
for (i=0; i<m_max_ports; i++) {
@@ -3724,7 +3793,7 @@ int CGlobalTRex::run_in_master() {
if (!is_all_cores_finished()) {
/* probably CLTR-C */
- try_stop_all_dp();
+ try_stop_all_cores();
}
m_mg.stop();
@@ -3739,16 +3808,17 @@ int CGlobalTRex::run_in_master() {
int CGlobalTRex::run_in_rx_core(void){
- if ( CGlobalInfo::m_options.is_rx_enabled() ){
- m_mg.start(0);
+ if (get_is_stateless()) {
+ m_rx_running = true;
+ m_rx_sl.start();
+ } else {
+ if ( CGlobalInfo::m_options.is_rx_enabled() ){
+ m_rx_running = true;
+ m_mg.start(0);
+ }
}
- // ??? start stateless rx
- return (0);
-}
-
-int CGlobalTRex::stop_core(virtual_thread_id_t virt_core_id){
- m_signal[virt_core_id]=1;
+ m_rx_running = false;
return (0);
}
@@ -3833,14 +3903,17 @@ int CGlobalTRex::stop_master(){
return (0);
}
-bool CGlobalTRex::is_all_cores_finished(){
+bool CGlobalTRex::is_all_cores_finished() {
int i;
for (i=0; i<get_cores_tx(); i++) {
if ( m_signal[i+1]==0){
- return (false);
+ return false;
}
}
- return (true);
+ if (m_rx_running)
+ return false;
+
+ return true;
}
@@ -3926,48 +3999,60 @@ int CGlobalTRex::start_master_statefull() {
////////////////////////////////////////////
-
static CGlobalTRex g_trex;
-// The HW counters start from some random values. The driver give us the diffs from previous,
-// each time we do get_rx_stats. We need to make one first call, at system startup,
-// and ignore the returned diffs
int CPhyEthIF::reset_hw_flow_stats() {
- uint32_t diff_stats[MAX_FLOW_STATS];
-
- if (get_ex_drv()->get_rx_stats(this, diff_stats, m_stats.m_fdir_prev_stats, 0, MAX_FLOW_STATS - 1) < 0) {
- return -1;
+ if (get_ex_drv()->hw_rx_stat_supported()) {
+ if (get_ex_drv()->reset_rx_stats(this, m_stats.m_fdir_prev_pkts) < 0) {
+ return -1;
+ }
+ } else {
+ g_trex.m_rx_sl.reset_rx_stats(get_port_id());
}
-
return 0;
}
// get/reset flow director counters
// return 0 if OK. -1 if operation not supported.
-// rx_stats, tx_stats - arrays of len max - min + 1. Returning rx, tx updated values.
+// rx_stats, tx_stats - arrays of len max - min + 1. Returning rx, tx updated absolute values.
// min, max - minimum, maximum counters range to get
// reset - If true, need to reset counter value after reading
-int CPhyEthIF::get_flow_stats(uint64_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset) {
- uint32_t diff_stats[MAX_FLOW_STATS];
-
- if (get_ex_drv()->get_rx_stats(this, diff_stats, m_stats.m_fdir_prev_stats, min, max) < 0) {
- return -1;
+int CPhyEthIF::get_flow_stats(rx_per_flow_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset) {
+ uint32_t diff_pkts[MAX_FLOW_STATS];
+ uint32_t diff_bytes[MAX_FLOW_STATS];
+ bool hw_rx_stat_supported = get_ex_drv()->hw_rx_stat_supported();
+
+ if (hw_rx_stat_supported) {
+ if (get_ex_drv()->get_rx_stats(this, diff_pkts, m_stats.m_fdir_prev_pkts
+ , diff_bytes, m_stats.m_fdir_prev_bytes, min, max) < 0) {
+ return -1;
+ }
+ } else {
+ g_trex.m_rx_sl.get_rx_stats(get_port_id(), rx_stats, min, max, reset);
}
for (int i = min; i <= max; i++) {
if ( reset ) {
// return value so far, and reset
- if (rx_stats != NULL) {
- rx_stats[i - min] = m_stats.m_rx_per_flow[i] + diff_stats[i];
+ if (hw_rx_stat_supported) {
+ if (rx_stats != NULL) {
+ rx_stats[i - min].set_pkts(m_stats.m_rx_per_flow_pkts[i] + diff_pkts[i]);
+ rx_stats[i - min].set_bytes(m_stats.m_rx_per_flow_bytes[i] + diff_bytes[i]);
+ }
+ m_stats.m_rx_per_flow_pkts[i] = 0;
+ m_stats.m_rx_per_flow_bytes[i] = 0;
}
if (tx_stats != NULL) {
tx_stats[i - min] = g_trex.clear_flow_tx_stats(m_port_id, i);
}
- m_stats.m_rx_per_flow[i] = 0;
} else {
- m_stats.m_rx_per_flow[i] += diff_stats[i];
- if (rx_stats != NULL) {
- rx_stats[i - min] = m_stats.m_rx_per_flow[i];
+ if (hw_rx_stat_supported) {
+ m_stats.m_rx_per_flow_pkts[i] += diff_pkts[i];
+ m_stats.m_rx_per_flow_bytes[i] += diff_bytes[i];
+ if (rx_stats != NULL) {
+ rx_stats[i - min].set_pkts(m_stats.m_rx_per_flow_pkts[i]);
+ rx_stats[i - min].set_bytes(m_stats.m_rx_per_flow_bytes[i]);
+ }
}
if (tx_stats != NULL) {
tx_stats[i - min] = g_trex.get_flow_tx_stats(m_port_id, i);
@@ -3978,6 +4063,8 @@ int CPhyEthIF::get_flow_stats(uint64_t *rx_stats, tx_per_flow_t *tx_stats, int m
return 0;
}
+// If needed, send packets to rx core for processing.
+// This is relevant only in VM case, where we receive packets to the working DP core (only 1 DP core in this case)
bool CCoreEthIF::process_rx_pkt(pkt_dir_t dir,
rte_mbuf_t * m){
@@ -3986,17 +4073,25 @@ bool CCoreEthIF::process_rx_pkt(pkt_dir_t dir,
return false;
}
bool send=false;
- CLatencyPktMode *c_l_pkt_mode = g_trex.m_mg.c_l_pkt_mode;
- bool is_lateancy_pkt = c_l_pkt_mode->IsLatencyPkt(parser.m_ipv4) & parser.IsLatencyPkt(parser.m_l4 + c_l_pkt_mode->l4_header_len());
- if (is_lateancy_pkt){
- send=true;
- }else{
- if ( get_is_rx_filter_enable() ){
- uint8_t max_ttl = 0xff - get_rx_check_hops();
- uint8_t pkt_ttl = parser.getTTl();
- if ( (pkt_ttl==max_ttl) || (pkt_ttl==(max_ttl-1) ) ) {
- send=true;
+ if ( get_is_stateless() ) {
+ // In stateless RX, we only care about flow stat packets
+ if ((parser.getIpId() & 0xff00) == IP_ID_RESERVE_BASE) {
+ send = true;
+ }
+ } else {
+ CLatencyPktMode *c_l_pkt_mode = g_trex.m_mg.c_l_pkt_mode;
+ bool is_lateancy_pkt = c_l_pkt_mode->IsLatencyPkt(parser.m_ipv4) & parser.IsLatencyPkt(parser.m_l4 + c_l_pkt_mode->l4_header_len());
+
+ if (is_lateancy_pkt) {
+ send = true;
+ } else {
+ if ( get_is_rx_filter_enable() ) {
+ uint8_t max_ttl = 0xff - get_rx_check_hops();
+ uint8_t pkt_ttl = parser.getTTl();
+ if ( (pkt_ttl==max_ttl) || (pkt_ttl==(max_ttl-1) ) ) {
+ send=true;
+ }
}
}
}
@@ -4036,7 +4131,6 @@ static int latency_one_lcore(__attribute__((unused)) void *dummy)
CPlatformSocketInfo * lpsock=&CGlobalInfo::m_socket;
physical_thread_id_t phy_id =rte_lcore_id();
-
if ( lpsock->thread_phy_is_rx(phy_id) ) {
g_trex.run_in_rx_core();
}else{
@@ -4060,7 +4154,6 @@ static int slave_one_lcore(__attribute__((unused)) void *dummy)
CPlatformSocketInfo * lpsock=&CGlobalInfo::m_socket;
physical_thread_id_t phy_id =rte_lcore_id();
-
if ( lpsock->thread_phy_is_rx(phy_id) ) {
g_trex.run_in_rx_core();
}else{
@@ -4387,7 +4480,7 @@ int main_test(int argc , char * argv[]){
&& (CGlobalInfo::m_options.m_latency_prev > 0)) {
uint32_t pkts = CGlobalInfo::m_options.m_latency_prev *
CGlobalInfo::m_options.m_latency_rate;
- printf("Start prev latency check- for %d sec \n",CGlobalInfo::m_options.m_latency_prev);
+ printf("Starting pre latency check for %d sec\n",CGlobalInfo::m_options.m_latency_prev);
g_trex.m_mg.start(pkts);
delay(CGlobalInfo::m_options.m_latency_prev* 1000);
printf("Finished \n");
@@ -4395,6 +4488,7 @@ int main_test(int argc , char * argv[]){
g_trex.reset_counters();
}
+ g_trex.m_rx_running = false;
if ( get_is_stateless() ) {
g_trex.start_master_stateless();
@@ -4448,6 +4542,12 @@ int CTRexExtendedDriverBase::configure_drop_queue(CPhyEthIF * _if) {
return (rte_eth_dev_rx_queue_stop(port_id, 0));
}
+CFlowStatParser *CTRexExtendedDriverBase::get_flow_stat_parser() {
+ CFlowStatParser *parser = new CFlowStatParser();
+ assert (parser);
+ return parser;
+}
+
void wait_x_sec(int sec) {
int i;
printf(" wait %d sec ", sec);
@@ -4610,7 +4710,7 @@ int CTRexExtendedDriverBase1G::configure_rx_filter_rules_stateless(CPhyEthIF * _
}
rule_id = 0;
- // filter for byte 18 of packet (lsb of IP ID) should equal ff
+ // filter for byte 18 of packet (msb of IP ID) should equal ff
_if->pci_reg_write( (E1000_FHFT(rule_id)+(2*16)) , 0x00ff0000);
_if->pci_reg_write( (E1000_FHFT(rule_id)+(2*16) + 8) , 0x04); /* MASK */
// + bytes 12 + 13 (ether type) should indicate IP.
@@ -4682,7 +4782,13 @@ void CTRexExtendedDriverBase1G::get_extended_stats(CPhyEthIF * _if,CPhyEthIFStat
void CTRexExtendedDriverBase1G::clear_extended_stats(CPhyEthIF * _if){
}
-
+#if 0
+int CTRexExtendedDriverBase1G::get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts
+ ,uint32_t *bytes, uint32_t *prev_bytes, int min, int max) {
+ uint32_t port_id = _if->get_port_id();
+ return g_trex.m_rx_sl.get_rx_stats(port_id, pkts, prev_pkts, bytes, prev_bytes, min, max);
+}
+#endif
void CTRexExtendedDriverBase10G::clear_extended_stats(CPhyEthIF * _if){
_if->pci_reg_read(IXGBE_RXNFGPC);
@@ -4698,7 +4804,43 @@ void CTRexExtendedDriverBase10G::update_configuration(port_cfg_t * cfg){
cfg->m_tx_conf.tx_thresh.wthresh = TX_WTHRESH;
}
-int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if){
+int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if) {
+ if ( get_is_stateless() ) {
+ return configure_rx_filter_rules_stateless(_if);
+ } else {
+ return configure_rx_filter_rules_statefull(_if);
+ }
+
+ return 0;
+}
+
+int CTRexExtendedDriverBase10G::configure_rx_filter_rules_stateless(CPhyEthIF * _if) {
+ uint8_t port_id = _if->get_rte_port_id();
+ int ip_id_lsb;
+
+ for (ip_id_lsb = 0; ip_id_lsb < MAX_FLOW_STATS; ip_id_lsb++ ) {
+ struct rte_eth_fdir_filter fdir_filter;
+ int res = 0;
+
+ memset(&fdir_filter,0,sizeof(fdir_filter));
+ fdir_filter.input.flow_type = RTE_ETH_FLOW_NONFRAG_IPV4_OTHER;
+ fdir_filter.soft_id = ip_id_lsb; // We can use the ip_id_lsb also as filter soft_id
+ fdir_filter.input.flow_ext.flexbytes[0] = 0xff;
+ fdir_filter.input.flow_ext.flexbytes[1] = ip_id_lsb;
+ fdir_filter.action.rx_queue = 1;
+ fdir_filter.action.behavior = RTE_ETH_FDIR_ACCEPT;
+ fdir_filter.action.report_status = RTE_ETH_FDIR_NO_REPORT_STATUS;
+ res = rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_FDIR, RTE_ETH_FILTER_ADD, &fdir_filter);
+
+ if (res != 0) {
+ rte_exit(EXIT_FAILURE, " ERROR rte_eth_dev_filter_ctrl : %d\n",res);
+ }
+ }
+
+ return 0;
+}
+
+int CTRexExtendedDriverBase10G::configure_rx_filter_rules_statefull(CPhyEthIF * _if) {
uint8_t port_id=_if->get_rte_port_id();
uint16_t hops = get_rx_check_hops();
uint16_t v4_hops = (hops << 8)&0xff00;
@@ -4809,6 +4951,12 @@ int CTRexExtendedDriverBase10G::wait_for_stable_link(){
return (0);
}
+CFlowStatParser *CTRexExtendedDriverBase10G::get_flow_stat_parser() {
+ CFlowStatParser *parser = new C82599Parser();
+ assert (parser);
+ return parser;
+}
+
////////////////////////////////////////////////////////////////////////////////
void CTRexExtendedDriverBase40G::clear_extended_stats(CPhyEthIF * _if){
rte_eth_stats_reset(_if->get_port_id());
@@ -4939,13 +5087,24 @@ int CTRexExtendedDriverBase40G::configure_rx_filter_rules(CPhyEthIF * _if) {
}
}
+int CTRexExtendedDriverBase40G::reset_rx_stats(CPhyEthIF * _if, uint32_t *stats) {
+ uint32_t diff_stats[MAX_FLOW_STATS];
+
+ // The HW counters start from some random values. The driver give us the diffs from previous,
+ // each time we do get_rx_stats. We need to make one first call, at system startup,
+ // and ignore the returned diffs
+ return get_rx_stats(_if, diff_stats, stats, NULL, NULL, 0, MAX_FLOW_STATS - 1);
+}
+
// instead of adding this to rte_ethdev.h
extern "C" int rte_eth_fdir_stats_get(uint8_t port_id, uint32_t *stats, uint32_t start, uint32_t len);
// get rx stats on _if, between min and max
-// prev_stats should be the previous values read from the hardware.
+// prev_pkts should be the previous values read from the hardware.
// Getting changed to be equal to current HW values.
-// stats return the diff between prev_stats and current hw values
-int CTRexExtendedDriverBase40G::get_rx_stats(CPhyEthIF * _if, uint32_t *stats, uint32_t *prev_stats, int min, int max) {
+// pkts return the diff between prev_pkts and current hw values
+// bytes and prev_bytes are not used. X710 fdir filters do not support byte count.
+int CTRexExtendedDriverBase40G::get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts
+ ,uint32_t *bytes, uint32_t *prev_bytes, int min, int max) {
uint32_t hw_stats[MAX_FLOW_STATS];
uint32_t port_id = _if->get_port_id();
uint32_t start = (port_id % m_if_per_card) * MAX_FLOW_STATS + min;
@@ -4954,13 +5113,13 @@ int CTRexExtendedDriverBase40G::get_rx_stats(CPhyEthIF * _if, uint32_t *stats, u
rte_eth_fdir_stats_get(port_id, hw_stats, start, len);
for (int i = loop_start; i < loop_start + len; i++) {
- if (hw_stats[i - min] >= prev_stats[i]) {
- stats[i] = (uint64_t)(hw_stats[i - min] - prev_stats[i]);
+ if (hw_stats[i - min] >= prev_pkts[i]) {
+ pkts[i] = (uint64_t)(hw_stats[i - min] - prev_pkts[i]);
} else {
// Wrap around
- stats[i] = (uint64_t)((hw_stats[i - min] + ((uint64_t)1 << 32)) - prev_stats[i]);
+ pkts[i] = (uint64_t)((hw_stats[i - min] + ((uint64_t)1 << 32)) - prev_pkts[i]);
}
- prev_stats[i] = hw_stats[i - min];
+ prev_pkts[i] = hw_stats[i - min];
}
return 0;
@@ -5025,6 +5184,12 @@ int CTRexExtendedDriverBase40G::wait_for_stable_link(){
return (0);
}
+CFlowStatParser *CTRexExtendedDriverBase40G::get_flow_stat_parser() {
+ CFlowStatParser *parser = new CFlowStatParser();
+ assert (parser);
+ return parser;
+}
+
/////////////////////////////////////////////////////////////////////
@@ -5144,6 +5309,9 @@ TrexDpdkPlatformApi::get_global_stats(TrexPlatformGlobalStats &stats) const {
g_trex.get_stats(trex_stats);
stats.m_stats.m_cpu_util = trex_stats.m_cpu_util;
+ if (get_is_stateless()) {
+ stats.m_stats.m_rx_cpu_util = trex_stats.m_rx_cpu_util;
+ }
stats.m_stats.m_tx_bps = trex_stats.m_tx_bps;
stats.m_stats.m_tx_pps = trex_stats.m_tx_pps;
@@ -5197,12 +5365,6 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id, intf_info_st &info
/* hardware */
g_trex.m_ports[interface_id].macaddr_get(&rte_mac_addr);
assert(ETHER_ADDR_LEN == 6);
- printf("interface %d speed: %d mac:", interface_id, info.speed);
- for (int i = 0; i < 6; i++) {
- info.mac_info.hw_macaddr[i] = rte_mac_addr.addr_bytes[i];
- printf("%x:", rte_mac_addr.addr_bytes[i]);
- }
- printf("\n");
/* software */
uint8_t sw_macaddr[12];
@@ -5235,8 +5397,8 @@ TrexDpdkPlatformApi::get_interface_stat_info(uint8_t interface_id, uint16_t &num
capabilities = CTRexExtendedDriverDb::Ins()->get_drv()->get_rx_stat_capabilities();
}
-int TrexDpdkPlatformApi::get_flow_stats(uint8 port_id, uint64_t *rx_stats, void *tx_stats, int min, int max, bool reset) const {
- return g_trex.m_ports[port_id].get_flow_stats(rx_stats, (tx_per_flow_t *)tx_stats, min, max, reset);
+int TrexDpdkPlatformApi::get_flow_stats(uint8 port_id, void *rx_stats, void *tx_stats, int min, int max, bool reset) const {
+ return g_trex.m_ports[port_id].get_flow_stats((rx_per_flow_t *)rx_stats, (tx_per_flow_t *)tx_stats, min, max, reset);
}
int TrexDpdkPlatformApi::reset_hw_flow_stats(uint8_t port_id) const {
@@ -5268,3 +5430,8 @@ void TrexDpdkPlatformApi::flush_dp_messages() const {
int TrexDpdkPlatformApi::get_active_pgids(flow_stat_active_t &result) const {
return g_trex.m_trex_stateless->m_rx_flow_stat.get_active_pgids(result);
}
+
+CFlowStatParser *TrexDpdkPlatformApi::get_flow_stat_parser() const {
+ return CTRexExtendedDriverDb::Ins()->get_drv()
+ ->get_flow_stat_parser();
+}
diff --git a/src/main_dpdk.h b/src/main_dpdk.h
index a475d321..ff1ea784 100644
--- a/src/main_dpdk.h
+++ b/src/main_dpdk.h
@@ -38,9 +38,11 @@ class CPhyEthIFStats {
uint64_t oerrors; /**< Total number of failed transmitted packets. */
uint64_t imcasts; /**< Total number of multicast received packets. */
uint64_t rx_nombuf; /**< Total number of RX mbuf allocation failures. */
- uint64_t m_rx_per_flow [MAX_FLOW_STATS]; // Per flow RX statistics
- // Previous fdir stats values read from HW. Since on xl710 this is 32 bit, we save old value, to handle wrap around.
- uint32_t m_fdir_prev_stats [MAX_FLOW_STATS];
+ uint64_t m_rx_per_flow_pkts [MAX_FLOW_STATS]; // Per flow RX pkts
+ uint64_t m_rx_per_flow_bytes[MAX_FLOW_STATS]; // Per flow RX bytes
+ // Previous fdir stats values read from driver. Since on xl710 this is 32 bit, we save old value, to handle wrap around.
+ uint32_t m_fdir_prev_pkts [MAX_FLOW_STATS];
+ uint32_t m_fdir_prev_bytes [MAX_FLOW_STATS];
public:
void Clear();
void Dump(FILE *fd);
@@ -73,7 +75,7 @@ class CPhyEthIF {
void get_stats(CPhyEthIFStats *stats);
int dump_fdir_global_stats(FILE *fd);
int reset_hw_flow_stats();
- int get_flow_stats(uint64_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset);
+ int get_flow_stats(rx_per_flow_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset);
void get_stats_1g(CPhyEthIFStats *stats);
void rx_queue_setup(uint16_t rx_queue_id,
uint16_t nb_rx_desc,
diff --git a/src/msg_manager.cpp b/src/msg_manager.cpp
index 9ade1bfc..7e39391a 100755
--- a/src/msg_manager.cpp
+++ b/src/msg_manager.cpp
@@ -4,7 +4,7 @@
*/
/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -65,12 +65,12 @@ void CMessagingManager::Delete(){
delete [] m_dp_to_cp;
m_dp_to_cp = NULL;
}
-
+
if (m_cp_to_dp) {
delete [] m_cp_to_dp;
m_cp_to_dp = NULL;
}
-
+
}
CNodeRing * CMessagingManager::getRingCpToDp(uint8_t thread_id){
@@ -84,7 +84,6 @@ CNodeRing * CMessagingManager::getRingDpToCp(uint8_t thread_id){
}
-
void CMsgIns::Free(){
if (m_ins) {
m_ins->Delete();
@@ -107,6 +106,11 @@ bool CMsgIns::Create(uint8_t num_threads){
if (!res) {
return (res);
}
+ res = m_cp_rx.Create(1, "cp_rx");
+ if (!res) {
+ return (res);
+ }
+
return (m_rx_dp.Create(num_threads,"rx_dp"));
}
@@ -114,9 +118,8 @@ bool CMsgIns::Create(uint8_t num_threads){
void CMsgIns::Delete(){
m_cp_dp.Delete();
m_rx_dp.Delete();
+ m_cp_rx.Delete();
}
-CMsgIns * CMsgIns::m_ins=0;
-
-
+CMsgIns * CMsgIns::m_ins=0;
diff --git a/src/msg_manager.h b/src/msg_manager.h
index 0390ce10..de11edbd 100755
--- a/src/msg_manager.h
+++ b/src/msg_manager.h
@@ -6,7 +6,7 @@
*/
/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -40,37 +40,37 @@ public:
/*
-e.g DP with 4 threads
-will look like this
+e.g DP with 4 threads
+will look like this
- cp_to_dp
+ cp_to_dp
master :push
dpx : pop
-
+
- --> dp0
cp - --> dp1
- --> dp2
- --> dp3
- dp_to_cp
+ dp_to_cp
cp : pop
dpx : push
-
+
<- -- dp0
cp <- -- dp1
<- -- dp2
<- -- dp3
-*/
+*/
class CGenNode ;
typedef CTRingSp<CGenNode> CNodeRing;
-/* CP == latency thread
+/* CP == latency thread
DP == traffic pkt generator */
class CMessagingManager {
public:
@@ -83,6 +83,7 @@ public:
void Delete();
CNodeRing * getRingCpToDp(uint8_t thread_id);
CNodeRing * getRingDpToCp(uint8_t thread_id);
+ CNodeRing * getRingCpToRx();
uint8_t get_num_threads(){
return (m_num_dp_threads);
}
@@ -106,6 +107,9 @@ public:
CMessagingManager * getCpDp(){
return (&m_cp_dp);
}
+ CMessagingManager * getCpRx(){
+ return (&m_cp_rx);
+ }
uint8_t get_num_threads(){
return (m_rx_dp.get_num_threads());
@@ -114,11 +118,11 @@ public:
private:
CMessagingManager m_rx_dp;
CMessagingManager m_cp_dp;
-
+ CMessagingManager m_cp_rx;
private:
/* one instance */
- static CMsgIns * m_ins;
+ static CMsgIns * m_ins;
};
#endif
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index 9e24802b..9df57a50 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -132,6 +132,7 @@ TrexStateless::encode_stats(Json::Value &global) {
api->get_global_stats(stats);
global["cpu_util"] = stats.m_stats.m_cpu_util;
+ global["rx_cpu_util"] = stats.m_stats.m_rx_cpu_util;
global["tx_bps"] = stats.m_stats.m_tx_bps;
global["rx_bps"] = stats.m_stats.m_rx_bps;
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 5947aaf7..90589d7a 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -473,6 +473,13 @@ TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBas
ring->Enqueue((CGenNode *)msg);
}
+void
+TrexStatelessPort::send_message_to_rx(TrexStatelessCpToRxMsgBase *msg) {
+
+ /* send the message to the core */
+ CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+ ring->Enqueue((CGenNode *)msg);
+}
uint64_t
TrexStatelessPort::get_port_speed_bps() const {
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index d3c4dcb9..7e1838d4 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -4,7 +4,7 @@
*/
/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -21,20 +21,21 @@ limitations under the License.
#ifndef __TREX_STATELESS_PORT_H__
#define __TREX_STATELESS_PORT_H__
-#include <trex_stream.h>
-#include <trex_dp_port_events.h>
-#include <internal_api/trex_platform_api.h>
+#include "internal_api/trex_platform_api.h"
+#include "trex_dp_port_events.h"
+#include "trex_stream.h"
class TrexStatelessCpToDpMsgBase;
+class TrexStatelessCpToRxMsgBase;
class TrexStreamsGraphObj;
class TrexPortMultiplier;
-/**
+/**
* TRex port owner can perform
* write commands
* while port is owned - others can
* do read only commands
- *
+ *
*/
class TrexPortOwner {
public:
@@ -92,7 +93,7 @@ private:
/* handler genereated internally */
std::string m_handler;
-
+
/* seed for generating random values */
unsigned int m_seed;
@@ -106,7 +107,7 @@ class AsyncStopEvent;
/**
* describes a stateless port
- *
+ *
* @author imarom (31-Aug-15)
*/
class TrexStatelessPort {
@@ -137,9 +138,9 @@ public:
RC_ERR_FAILED_TO_COMPILE_STREAMS
};
-
+
TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api);
-
+
~TrexStatelessPort();
/**
@@ -155,11 +156,11 @@ public:
void release(void);
/**
- * validate the state of the port before start
- * it will return a stream graph
- * containing information about the streams
- * configured on this port
- *
+ * validate the state of the port before start
+ * it will return a stream graph
+ * containing information about the streams
+ * configured on this port
+ *
* on error it throws TrexException
*/
const TrexStreamsGraphObj *validate(void);
@@ -190,13 +191,13 @@ public:
/**
* update current traffic on port
- *
+ *
*/
void update_traffic(const TrexPortMultiplier &mul, bool force);
/**
* get the port state
- *
+ *
*/
port_state_e get_state() const {
return m_port_state;
@@ -204,23 +205,23 @@ public:
/**
* port state as string
- *
+ *
*/
std::string get_state_as_string() const;
/**
* the the max stream id currently assigned
- *
+ *
*/
int get_max_stream_id() const;
/**
* fill up properties of the port
- *
+ *
* @author imarom (16-Sep-15)
- *
- * @param driver
- * @param speed
+ *
+ * @param driver
+ * @param speed
*/
void get_properties(std::string &driver, TrexPlatformApi::driver_speed_e &speed);
@@ -237,7 +238,7 @@ public:
/**
* delegators
- *
+ *
*/
void add_stream(TrexStream *stream);
@@ -267,7 +268,7 @@ public:
/**
* returns the number of DP cores linked to this port
- *
+ *
*/
uint8_t get_dp_core_count() {
return m_cores_id_list.size();
@@ -275,7 +276,7 @@ public:
/**
* returns the traffic multiplier currently being used by the DP
- *
+ *
*/
double get_multiplier() {
return (m_factor);
@@ -283,13 +284,13 @@ public:
/**
* get port speed in bits per second
- *
+ *
*/
uint64_t get_port_speed_bps() const;
/**
* return RX caps
- *
+ *
*/
int get_rx_caps() const {
return m_rx_caps;
@@ -300,12 +301,12 @@ public:
}
/**
- * return true if port adds CRC to a packet (not occurs for
- * VNICs)
- *
+ * return true if port adds CRC to a packet (not occurs for
+ * VNICs)
+ *
* @author imarom (24-Feb-16)
- *
- * @return bool
+ *
+ * @return bool
*/
bool has_crc_added() const {
return m_api_info.has_crc;
@@ -318,9 +319,9 @@ public:
/**
* get the port effective rate (on a started / paused port)
- *
+ *
* @author imarom (07-Jan-16)
- *
+ *
*/
void get_port_effective_rate(double &pps,
double &bps_L1,
@@ -330,8 +331,8 @@ public:
/**
* set port promiscuous on/off
- *
- * @param enabled
+ *
+ * @param enabled
*/
void set_promiscuous(bool enabled);
bool get_promiscuous();
@@ -357,40 +358,45 @@ private:
/**
* send message to all cores using duplicate
- *
+ *
*/
void send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg);
/**
* send message to specific DP core
- *
+ *
*/
void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg);
+ /**
+ * send message to specific RX core
+ *
+ */
+ void send_message_to_rx(TrexStatelessCpToRxMsgBase *msg);
/**
* when a port stops, perform various actions
- *
+ *
*/
void common_port_stop_actions(bool async);
/**
* calculate effective M per core
- *
+ *
*/
double calculate_effective_factor(const TrexPortMultiplier &mul, bool force = false);
double calculate_effective_factor_internal(const TrexPortMultiplier &mul);
-
+
/**
* generates a graph of streams graph
- *
+ *
*/
void generate_streams_graph();
/**
* dispose of it
- *
+ *
* @author imarom (26-Nov-15)
*/
void delete_streams_graph();
@@ -426,7 +432,7 @@ private:
/**
* port multiplier object
- *
+ *
*/
class TrexPortMultiplier {
public:
@@ -443,8 +449,8 @@ public:
};
/**
- * multiplier can be absolute value
- * increment value or subtract value
+ * multiplier can be absolute value
+ * increment value or subtract value
*/
enum mul_op_e {
OP_ABS,
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
index be5002da..563236c2 100644
--- a/src/stateless/cp/trex_streams_compiler.cpp
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -477,7 +477,8 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream,
TrexStream *fixed_rx_flow_stat_stream = stream->clone(true);
- get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id); //???? check for errors
+ // not checking for errors. We assume that if add_stream succeeded, start_stream will too.
+ get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id);
/* can this stream be split to many cores ? */
if (!stream->is_splitable(dp_core_count)) {
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index f8d6d828..ba25f61d 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -5,7 +5,7 @@
*/
/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -19,14 +19,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
-#include <trex_stateless_dp_core.h>
-#include <trex_stateless_messaging.h>
-#include <trex_streams_compiler.h>
-#include <trex_stream_node.h>
-#include <trex_stream.h>
-
-#include <bp_sim.h>
-
+#include "bp_sim.h"
+#include "trex_stateless_dp_core.h"
+#include "trex_stateless_messaging.h"
+#include "trex_stream.h"
+#include "trex_stream_node.h"
+#include "trex_streams_compiler.h"
void CDpOneStream::Delete(CFlowGenListPerThread * core){
assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 333aec88..7edf0f13 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -5,7 +5,7 @@
*/
/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -19,17 +19,18 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
-#include <trex_stateless_messaging.h>
-#include <trex_stateless_dp_core.h>
-#include <trex_streams_compiler.h>
-#include <trex_stateless.h>
-#include <bp_sim.h>
-
#include <string.h>
+#include "trex_stateless_messaging.h"
+#include "trex_stateless_dp_core.h"
+#include "trex_stateless_rx_core.h"
+#include "trex_streams_compiler.h"
+#include "trex_stateless.h"
+#include "bp_sim.h"
+
/*************************
start traffic message
- ************************/
+ ************************/
TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexStreamsCompiledObj *obj, double duration) {
m_port_id = port_id;
m_event_id = event_id;
@@ -40,7 +41,7 @@ TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexSt
/**
* clone for DP start message
- *
+ *
*/
TrexStatelessCpToDpMsgBase *
TrexStatelessDpStart::clone() {
@@ -69,7 +70,7 @@ TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
/*************************
stop traffic message
- ************************/
+ ************************/
bool
TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) {
@@ -114,7 +115,7 @@ bool TrexStatelessDpResume::handle(TrexStatelessDpCore *dp_core){
/**
* clone for DP stop message
- *
+ *
*/
TrexStatelessCpToDpMsgBase *
TrexStatelessDpStop::clone() {
@@ -130,7 +131,7 @@ TrexStatelessDpStop::clone() {
-TrexStatelessCpToDpMsgBase *
+TrexStatelessCpToDpMsgBase *
TrexStatelessDpQuit::clone(){
TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpQuit();
@@ -140,7 +141,7 @@ TrexStatelessDpQuit::clone(){
bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){
-
+
/* quit */
dp_core->quit_main_loop();
return (true);
@@ -155,7 +156,7 @@ bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){
return (true);
}
-TrexStatelessCpToDpMsgBase *
+TrexStatelessCpToDpMsgBase *
TrexStatelessDpCanQuit::clone(){
TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpCanQuit();
@@ -165,7 +166,7 @@ TrexStatelessDpCanQuit::clone(){
/*************************
update traffic message
- ************************/
+ ************************/
bool
TrexStatelessDpUpdate::handle(TrexStatelessDpCore *dp_core) {
dp_core->update_traffic(m_port_id, m_factor);
@@ -207,3 +208,18 @@ TrexDpPortEventMsg::handle() {
return (true);
}
+/************************* messages from CP to RX **********************/
+bool TrexStatelessRxStartMsg::handle (CRxCoreStateless *rx_core) {
+ rx_core->work();
+ return true;
+}
+
+bool TrexStatelessRxStopMsg::handle (CRxCoreStateless *rx_core) {
+ rx_core->idle();
+ return true;
+}
+
+bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) {
+ rx_core->quit();
+ return true;
+}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index dda086b7..0eed01bd 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -5,7 +5,7 @@
*/
/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -22,16 +22,17 @@ limitations under the License.
#ifndef __TREX_STATELESS_MESSAGING_H__
#define __TREX_STATELESS_MESSAGING_H__
-#include <msg_manager.h>
-#include <trex_dp_port_events.h>
+#include "msg_manager.h"
+#include "trex_dp_port_events.h"
class TrexStatelessDpCore;
+class CRxCoreStateless;
class TrexStreamsCompiledObj;
class CFlowGenListPerThread;
/**
* defines the base class for CP to DP messages
- *
+ *
* @author imarom (27-Oct-15)
*/
class TrexStatelessCpToDpMsgBase {
@@ -49,7 +50,7 @@ public:
/**
* clone the current message
- *
+ *
*/
virtual TrexStatelessCpToDpMsgBase * clone() = 0;
@@ -76,7 +77,7 @@ protected:
/**
* a message to start traffic
- *
+ *
* @author imarom (27-Oct-15)
*/
class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase {
@@ -137,7 +138,7 @@ private:
/**
* a message to stop traffic
- *
+ *
* @author imarom (27-Oct-15)
*/
class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase {
@@ -191,9 +192,9 @@ private:
};
/**
- * a message to Quit the datapath traffic. support only stateless for now
- *
- * @author hhaim
+ * a message to Quit the datapath traffic. support only stateless for now
+ *
+ * @author hhaim
*/
class TrexStatelessDpQuit : public TrexStatelessCpToDpMsgBase {
public:
@@ -209,9 +210,9 @@ public:
};
/**
- * a message to check if both port are idel and exit
- *
- * @author hhaim
+ * a message to check if both port are idel and exit
+ *
+ * @author hhaim
*/
class TrexStatelessDpCanQuit : public TrexStatelessCpToDpMsgBase {
public:
@@ -247,7 +248,7 @@ private:
/**
* barrier message for DP core
- *
+ *
*/
class TrexStatelessDpBarrier : public TrexStatelessCpToDpMsgBase {
public:
@@ -270,7 +271,7 @@ private:
/**
* defines the base class for CP to DP messages
- *
+ *
* @author imarom (27-Oct-15)
*/
class TrexStatelessDpToCpMsgBase {
@@ -284,7 +285,7 @@ public:
/**
* virtual function to handle a message
- *
+ *
*/
virtual bool handle() = 0;
@@ -295,9 +296,9 @@ public:
/**
- * a message indicating an event has happened on a port at the
- * DP
- *
+ * a message indicating an event has happened on a port at the
+ * DP
+ *
*/
class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
public:
@@ -326,8 +327,45 @@ private:
int m_thread_id;
uint8_t m_port_id;
int m_event_id;
-
+
};
-#endif /* __TREX_STATELESS_MESSAGING_H__ */
+/************************* messages from CP to RX **********************/
+/**
+ * defines the base class for CP to RX messages
+ *
+ */
+class TrexStatelessCpToRxMsgBase {
+public:
+
+ TrexStatelessCpToRxMsgBase() {
+ }
+
+ virtual ~TrexStatelessCpToRxMsgBase() {
+ }
+
+ /**
+ * virtual function to handle a message
+ *
+ */
+ virtual bool handle (CRxCoreStateless *rx_core) = 0;
+
+ /* no copy constructor */
+ TrexStatelessCpToRxMsgBase(TrexStatelessCpToRxMsgBase &) = delete;
+
+};
+
+class TrexStatelessRxStartMsg : public TrexStatelessCpToRxMsgBase {
+ bool handle (CRxCoreStateless *rx_core);
+};
+
+class TrexStatelessRxStopMsg : public TrexStatelessCpToRxMsgBase {
+ bool handle (CRxCoreStateless *rx_core);
+};
+
+class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
+ bool handle (CRxCoreStateless *rx_core);
+};
+
+#endif /* __TREX_STATELESS_MESSAGING_H__ */
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
new file mode 100644
index 00000000..929ad7fa
--- /dev/null
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -0,0 +1,217 @@
+#include <stdio.h>
+#include "bp_sim.h"
+#include "flow_stat_parser.h"
+#include "latency.h"
+#include "trex_stateless_messaging.h"
+#include "trex_stateless_rx_core.h"
+
+void CRxCoreStateless::create(const CRxSlCfg &cfg) {
+ m_max_ports = cfg.m_max_ports;
+
+ CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx();
+
+ m_ring_from_cp = cp_rx->getRingCpToDp(0);
+ m_ring_to_cp = cp_rx->getRingDpToCp(0);
+ m_state = STATE_IDLE;
+
+ for (int i = 0; i < m_max_ports; i++) {
+ CLatencyManagerPerPort * lp = &m_ports[i];
+ lp->m_io = cfg.m_ports[i];
+ }
+ m_cpu_cp_u.Create(&m_cpu_dp_u);
+}
+
+void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) {
+ msg->handle(this);
+ delete msg;
+}
+
+bool CRxCoreStateless::periodic_check_for_cp_messages() {
+ /* fast path */
+ if ( likely ( m_ring_from_cp->isEmpty() ) ) {
+ return false;
+ }
+
+ while ( true ) {
+ CGenNode * node = NULL;
+
+ if (m_ring_from_cp->Dequeue(node) != 0) {
+ break;
+ }
+ assert(node);
+ TrexStatelessCpToRxMsgBase * msg = (TrexStatelessCpToRxMsgBase *)node;
+ handle_cp_msg(msg);
+ }
+
+ return true;
+
+}
+
+void CRxCoreStateless::idle_state_loop() {
+ const int SHORT_DELAY_MS = 2;
+ const int LONG_DELAY_MS = 50;
+ const int DEEP_SLEEP_LIMIT = 2000;
+
+ int counter = 0;
+
+ while (m_state == STATE_IDLE) {
+ bool had_msg = periodic_check_for_cp_messages();
+ if (had_msg) {
+ counter = 0;
+ continue;
+ }
+
+ /* enter deep sleep only if enough time had passed */
+ if (counter < DEEP_SLEEP_LIMIT) {
+ delay(SHORT_DELAY_MS);
+ counter++;
+ } else {
+ delay(LONG_DELAY_MS);
+ }
+ }
+}
+
+void CRxCoreStateless::start() {
+ static int count = 0;
+ static int i = 0;
+ bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false;
+
+ while (true) {
+ if (m_state == STATE_WORKING) {
+ i++;
+ if (i == 100) {
+ i = 0;
+ // if no packets in 100 cycles, sleep for a while to spare the cpu
+ if (count == 0) {
+ delay(1);
+ }
+ count = 0;
+ periodic_check_for_cp_messages(); // m_state might change in here
+ }
+ } else {
+ if (m_state == STATE_QUIT)
+ break;
+ idle_state_loop();
+ }
+ if (do_try_rx_queue) {
+ try_rx_queues();
+ }
+ count += try_rx();
+ }
+}
+
+void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPort *lp, rte_mbuf_t *m) {
+ CFlowStatParser parser;
+
+ if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
+ uint16_t ip_id;
+ if (parser.get_ip_id(ip_id) == 0) {
+ if (is_flow_stat_id(ip_id)) {
+ uint16_t hw_id = get_hw_id(ip_id);
+ lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1);
+ lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len);
+ }
+ }
+ }
+}
+
+// In VM setup, handle packets coming as messages from DP cores.
+void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) {
+ while ( true ) {
+ CGenNode * node;
+ if ( r->Dequeue(node) != 0 ) {
+ break;
+ }
+ assert(node);
+
+ CGenNodeMsgBase * msg = (CGenNodeMsgBase *)node;
+ CGenNodeLatencyPktInfo * l_msg;
+ uint8_t msg_type = msg->m_msg_type;
+ uint8_t rx_port_index;
+ CLatencyManagerPerPort * lp;
+
+ switch (msg_type) {
+ case CGenNodeMsgBase::LATENCY_PKT:
+ l_msg = (CGenNodeLatencyPktInfo *)msg;
+ assert(l_msg->m_latency_offset == 0xdead);
+ rx_port_index = (thread_id << 1) + (l_msg->m_dir & 1);
+ assert( rx_port_index < m_max_ports );
+ lp = &m_ports[rx_port_index];
+ handle_rx_pkt(lp, (rte_mbuf_t *)l_msg->m_pkt);
+ break;
+ default:
+ printf("ERROR latency-thread message type is not valid %d \n", msg_type);
+ assert(0);
+ }
+
+ CGlobalInfo::free_node(node);
+ }
+}
+
+// VM mode function. Handle messages from DP
+void CRxCoreStateless::try_rx_queues() {
+
+ CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
+ uint8_t threads=CMsgIns::Ins()->get_num_threads();
+ int ti;
+ for (ti = 0; ti < (int)threads; ti++) {
+ CNodeRing * r = rx_dp->getRingDpToCp(ti);
+ if ( ! r->isEmpty() ) {
+ handle_rx_queue_msgs((uint8_t)ti, r);
+ }
+ }
+}
+
+int CRxCoreStateless::try_rx() {
+ rte_mbuf_t * rx_pkts[64];
+ int i, total_pkts = 0;
+ for (i = 0; i < m_max_ports; i++) {
+ CLatencyManagerPerPort * lp = &m_ports[i];
+ rte_mbuf_t * m;
+ m_cpu_dp_u.start_work();
+ /* try to read 64 packets clean up the queue */
+ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
+ total_pkts += cnt_p;
+ if (cnt_p) {
+ int j;
+ for (j = 0; j < cnt_p; j++) {
+ m = rx_pkts[j];
+ handle_rx_pkt(lp, m);
+ rte_pktmbuf_free(m);
+ }
+ /* commit only if there was work to do ! */
+ m_cpu_dp_u.commit();
+ }/* if work */
+ }// all ports
+ return total_pkts;
+}
+
+bool CRxCoreStateless::is_flow_stat_id(uint16_t id) {
+ if ((id & 0xff00) == IP_ID_RESERVE_BASE) return true;
+ return false;
+}
+
+uint16_t CRxCoreStateless::get_hw_id(uint16_t id) {
+ return (0x00ff & id);
+}
+
+void CRxCoreStateless::reset_rx_stats(uint8_t port_id) {
+ for (int hw_id = 0; hw_id < MAX_FLOW_STATS; hw_id++) {
+ m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear();
+ }
+}
+
+int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset) {
+ for (int hw_id = min; hw_id <= max; hw_id++) {
+ rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id];
+ if (reset) {
+ m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear();
+ }
+ }
+ return 0;
+}
+
+double CRxCoreStateless::get_cpu_util() {
+ m_cpu_cp_u.Update();
+ return m_cpu_cp_u.GetVal();
+}
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
new file mode 100644
index 00000000..5ab12f4e
--- /dev/null
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -0,0 +1,80 @@
+/*
+ Ido Barnea
+ Cisco Systems, Inc.
+*/
+
+/*
+ Copyright (c) 2016-2016 Cisco Systems, Inc.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+#ifndef __TREX_STATELESS_RX_CORE_H__
+#define __TREX_STATELESS_RX_CORE_H__
+#include <stdint.h>
+#include "latency.h"
+#include "utl_cpuu.h"
+
+class TrexStatelessCpToRxMsgBase;
+
+class CRxSlCfg {
+ public:
+ CRxSlCfg (){
+ m_max_ports = 0;
+ m_cps = 0.0;
+ }
+
+ public:
+ uint32_t m_max_ports;
+ double m_cps;
+ CPortLatencyHWBase * m_ports[TREX_MAX_PORTS];
+};
+
+class CRxCoreStateless {
+ enum state_e {
+ STATE_IDLE,
+ STATE_WORKING,
+ STATE_QUIT
+ };
+
+ public:
+ void start();
+ void create(const CRxSlCfg &cfg);
+ void reset_rx_stats(uint8_t port_id);
+ int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset);
+ void work() {m_state = STATE_WORKING;}
+ void idle() {m_state = STATE_IDLE;}
+ void quit() {m_state = STATE_QUIT;}
+ double get_cpu_util();
+
+ private:
+ void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg);
+ bool periodic_check_for_cp_messages();
+ void idle_state_loop();
+ void handle_rx_pkt(CLatencyManagerPerPort * lp, rte_mbuf_t * m);
+ void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
+ int try_rx();
+ void try_rx_queues();
+ bool is_flow_stat_id(uint16_t id);
+ uint16_t get_hw_id(uint16_t id);
+
+ private:
+ uint32_t m_max_ports;
+ bool m_has_streams;
+ CLatencyManagerPerPort m_ports[TREX_MAX_PORTS];
+ state_e m_state; /* state of all ports */
+ CNodeRing *m_ring_from_cp;
+ CNodeRing *m_ring_to_cp;
+ CCpuUtlDp m_cpu_dp_u;
+ CCpuUtlCp m_cpu_cp_u;
+};
+#endif