diff options
Diffstat (limited to 'src/stateless')
-rw-r--r-- | src/stateless/cp/trex_stateless.cpp | 162 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.h | 74 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 143 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 76 | ||||
-rw-r--r-- | src/stateless/cp/trex_stream.cpp | 17 | ||||
-rw-r--r-- | src/stateless/cp/trex_stream.h | 17 | ||||
-rw-r--r-- | src/stateless/cp/trex_streams_compiler.cpp | 81 | ||||
-rw-r--r-- | src/stateless/cp/trex_streams_compiler.h | 68 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.cpp | 210 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.h | 105 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 53 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 79 |
12 files changed, 665 insertions, 420 deletions
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 72762e26..e0e95450 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -31,55 +31,58 @@ using namespace std; * Trex stateless object * **********************************************************/ -TrexStateless::TrexStateless() { - m_is_configured = false; -} - /** - * configure the singleton stateless object * */ -void TrexStateless::configure(const TrexStatelessCfg &cfg) { - - TrexStateless& instance = get_instance_internal(); - - /* check status */ - if (instance.m_is_configured) { - throw TrexException("re-configuration of stateless object is not allowed"); - } +TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { /* create RPC servers */ /* set both servers to mutex each other */ - instance.m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &instance.m_global_cp_lock); - instance.m_rpc_server->set_verbose(cfg.m_rpc_server_verbose); + m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &m_global_cp_lock); + m_rpc_server->set_verbose(cfg.m_rpc_server_verbose); /* configure ports */ + m_port_count = cfg.m_port_count; - instance.m_port_count = cfg.m_port_count; - - for (int i = 0; i < instance.m_port_count; i++) { - instance.m_ports.push_back(new TrexStatelessPort(i)); + for (int i = 0; i < m_port_count; i++) { + m_ports.push_back(new TrexStatelessPort(i)); } - /* cores */ - instance.m_dp_core_count = cfg.m_dp_core_count; - for (int i = 0; i < instance.m_dp_core_count; i++) { - instance.m_dp_cores.push_back(new TrexStatelessDpCore(i)); + m_platform_api = cfg.m_platform_api; +} + +/** + * release all memory + * + * @author imarom (08-Oct-15) + */ +TrexStateless::~TrexStateless() { + + /* release memory for ports */ + for (auto port : m_ports) { + delete port; } + m_ports.clear(); + + /* stops the RPC server */ + m_rpc_server->stop(); + delete m_rpc_server; + + m_rpc_server = NULL; - /* done */ - instance.m_is_configured = true; + delete m_platform_api; + m_platform_api = NULL; } + /** * starts the control plane side * */ void TrexStateless::launch_control_plane() { - //std::cout << "\n on control/master core \n"; /* pin this process to the current running CPU any new thread will be called on the same CPU @@ -94,39 +97,6 @@ TrexStateless::launch_control_plane() { m_rpc_server->start(); } -void -TrexStateless::launch_on_dp_core(uint8_t core_id) { - m_dp_cores[core_id - 1]->run(); -} - -/** - * destroy the singleton and release all memory - * - * @author imarom (08-Oct-15) - */ -void -TrexStateless::destroy() { - TrexStateless& instance = get_instance_internal(); - - if (!instance.m_is_configured) { - return; - } - - /* release memory for ports */ - for (auto port : instance.m_ports) { - delete port; - } - instance.m_ports.clear(); - - /* stops the RPC server */ - instance.m_rpc_server->stop(); - delete instance.m_rpc_server; - - instance.m_rpc_server = NULL; - - /* done */ - instance.m_is_configured = false; -} /** * fetch a port by ID @@ -148,57 +118,32 @@ TrexStateless::get_port_count() { uint8_t TrexStateless::get_dp_core_count() { - return m_dp_core_count; -} - -void -TrexStateless::update_stats() { - - /* update CPU util. - TODO - */ - m_stats.m_stats.m_cpu_util = 0; - - /* for every port update and accumulate */ - for (uint8_t i = 0; i < m_port_count; i++) { - m_ports[i]->update_stats(); - - const TrexPortStats & port_stats = m_ports[i]->get_stats(); - - m_stats.m_stats.m_tx_bps += port_stats.m_stats.m_tx_bps; - m_stats.m_stats.m_rx_bps += port_stats.m_stats.m_rx_bps; - - m_stats.m_stats.m_tx_pps += port_stats.m_stats.m_tx_pps; - m_stats.m_stats.m_rx_pps += port_stats.m_stats.m_rx_pps; - - m_stats.m_stats.m_total_tx_pkts += port_stats.m_stats.m_total_tx_pkts; - m_stats.m_stats.m_total_rx_pkts += port_stats.m_stats.m_total_rx_pkts; - - m_stats.m_stats.m_total_tx_bytes += port_stats.m_stats.m_total_tx_bytes; - m_stats.m_stats.m_total_rx_bytes += port_stats.m_stats.m_total_rx_bytes; - - m_stats.m_stats.m_tx_rx_errors += port_stats.m_stats.m_tx_rx_errors; - } + return m_platform_api->get_dp_core_count(); } void TrexStateless::encode_stats(Json::Value &global) { - global["cpu_util"] = m_stats.m_stats.m_cpu_util; + const TrexPlatformApi *api = get_stateless_obj()->get_platform_api(); + + TrexPlatformGlobalStats stats; + api->get_global_stats(stats); - global["tx_bps"] = m_stats.m_stats.m_tx_bps; - global["rx_bps"] = m_stats.m_stats.m_rx_bps; + global["cpu_util"] = stats.m_stats.m_cpu_util; - global["tx_pps"] = m_stats.m_stats.m_tx_pps; - global["rx_pps"] = m_stats.m_stats.m_rx_pps; + global["tx_bps"] = stats.m_stats.m_tx_bps; + global["rx_bps"] = stats.m_stats.m_rx_bps; - global["total_tx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_pkts); - global["total_rx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_pkts); + global["tx_pps"] = stats.m_stats.m_tx_pps; + global["rx_pps"] = stats.m_stats.m_rx_pps; - global["total_tx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_bytes); - global["total_rx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_bytes); + global["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts); + global["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts); - global["tx_rx_errors"] = Json::Value::UInt64(m_stats.m_stats.m_tx_rx_errors); + global["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes); + global["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes); + + global["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors); for (uint8_t i = 0; i < m_port_count; i++) { std::stringstream ss; @@ -210,3 +155,20 @@ TrexStateless::encode_stats(Json::Value &global) { } } +/** + * generate a snapshot for publish (async publish) + * + */ +void +TrexStateless::generate_publish_snapshot(std::string &snapshot) { + Json::FastWriter writer; + Json::Value root; + + root["name"] = "trex-stateless-info"; + root["type"] = 0; + + /* stateless specific info goes here */ + root["data"] = Json::nullValue; + + snapshot = writer.write(root); +} diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 649b25dd..758707a2 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -29,9 +29,10 @@ limitations under the License. #include <trex_stream.h> #include <trex_stateless_port.h> -#include <trex_stateless_dp_core.h> #include <trex_rpc_server_api.h> +#include <internal_api/trex_platform_api.h> + /** * generic exception for errors * TODO: move this to a better place @@ -88,17 +89,17 @@ public: /* default values */ TrexStatelessCfg() { m_port_count = 0; - m_dp_core_count = 0; m_rpc_req_resp_cfg = NULL; m_rpc_async_cfg = NULL; - m_rpc_server_verbose = false; + m_rpc_server_verbose = false; + m_platform_api = NULL; } const TrexRpcServerConfig *m_rpc_req_resp_cfg; const TrexRpcServerConfig *m_rpc_async_cfg; + const TrexPlatformApi *m_platform_api; bool m_rpc_server_verbose; uint8_t m_port_count; - uint8_t m_dp_core_count; }; /** @@ -113,27 +114,8 @@ public: * reconfiguration is not allowed * an exception will be thrown */ - static void configure(const TrexStatelessCfg &cfg); - - /** - * destroy the instance - * - */ - static void destroy(); - - /** - * singleton public get instance - * - */ - static TrexStateless& get_instance() { - TrexStateless& instance = get_instance_internal(); - - if (!instance.m_is_configured) { - throw TrexException("object is not configured"); - } - - return instance; - } + TrexStateless(const TrexStatelessCfg &cfg); + ~TrexStateless(); /** * starts the control plane side @@ -152,12 +134,6 @@ public: uint8_t get_dp_core_count(); - /** - * update all the stats (deep update) - * (include all the ports and global stats) - * - */ - void update_stats(); /** * fetch all the stats @@ -165,22 +141,21 @@ public: */ void encode_stats(Json::Value &global); + /** + * generate a snapshot for publish + */ + void generate_publish_snapshot(std::string &snapshot); -protected: - TrexStateless(); - - static TrexStateless& get_instance_internal () { - static TrexStateless instance; - return instance; + const TrexPlatformApi * get_platform_api() { + return (m_platform_api); } - /* c++ 2011 style singleton */ +protected: + + /* no copy or assignment */ TrexStateless(TrexStateless const&) = delete; void operator=(TrexStateless const&) = delete; - /* status */ - bool m_is_configured; - /* RPC server array */ TrexRpcServer *m_rpc_server; @@ -188,15 +163,20 @@ protected: std::vector <TrexStatelessPort *> m_ports; uint8_t m_port_count; - /* cores */ - std::vector <TrexStatelessDpCore *> m_dp_cores; - uint8_t m_dp_core_count; - - /* stats */ - TrexStatelessStats m_stats; + /* platform API */ + const TrexPlatformApi *m_platform_api; std::mutex m_global_cp_lock; }; +/** + * an anchor function + * + * @author imarom (25-Oct-15) + * + * @return TrexStateless& + */ +TrexStateless * get_stateless_obj(); + #endif /* __TREX_STATELESS_H__ */ diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index a31847a5..375d1f63 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -18,8 +18,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + #include <trex_stateless.h> #include <trex_stateless_port.h> +#include <trex_stateless_messaging.h> +#include <trex_streams_compiler.h> + #include <string> #ifndef TREX_RPC_MOCK_SERVER @@ -59,19 +63,52 @@ TrexStatelessPort::start_traffic(void) { return (RC_ERR_NO_STREAMS); } + /* fetch all the streams from the table */ + vector<TrexStream *> streams; + get_stream_table()->get_object_list(streams); + + /* compiler it */ + TrexStreamsCompiler compiler; + TrexStreamsCompiledObj *compiled_obj = new TrexStreamsCompiledObj(); + + bool rc = compiler.compile(streams, *compiled_obj); + if (!rc) { + return (RC_ERR_FAILED_TO_COMPILE_STREAMS); + } + + /* generate a message to all the relevant DP cores to start transmitting */ + TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(compiled_obj); + + // FIXME (add the right core list) + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(0); + + ring->Enqueue((CGenNode *)start_msg); + + /* move the state to transmiting */ m_port_state = PORT_STATE_TRANSMITTING; - /* real code goes here */ return (RC_OK); } -void +TrexStatelessPort::rc_e TrexStatelessPort::stop_traffic(void) { /* real code goes here */ - if (m_port_state == PORT_STATE_TRANSMITTING) { - m_port_state = PORT_STATE_UP_IDLE; + if (m_port_state != PORT_STATE_TRANSMITTING) { + return (RC_ERR_BAD_STATE_FOR_OP); } + + /* generate a message to all the relevant DP cores to start transmitting */ + TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(); + + // FIXME (add the right core list) + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(0); + + ring->Enqueue((CGenNode *)stop_msg); + + m_port_state = PORT_STATE_UP_IDLE; + + return (RC_OK); } /** @@ -130,99 +167,27 @@ TrexStatelessPort::generate_handler() { return (ss.str()); } -/** - * update stats for the port - * - */ -void -TrexStatelessPort::update_stats() { - struct rte_eth_stats stats; - rte_eth_stats_get(m_port_id, &stats); - - /* copy straight values */ - m_stats.m_stats.m_total_tx_bytes = stats.obytes; - m_stats.m_stats.m_total_rx_bytes = stats.ibytes; - - m_stats.m_stats.m_total_tx_pkts = stats.opackets; - m_stats.m_stats.m_total_rx_pkts = stats.ipackets; - - /* calculate stats */ - m_stats.m_stats.m_tx_bps = m_stats.m_bw_tx_bps.add(stats.obytes); - m_stats.m_stats.m_rx_bps = m_stats.m_bw_rx_bps.add(stats.ibytes); - - m_stats.m_stats.m_tx_pps = m_stats.m_bw_tx_pps.add(stats.opackets); - m_stats.m_stats.m_rx_pps = m_stats.m_bw_rx_pps.add(stats.ipackets); - -} - -const TrexPortStats & -TrexStatelessPort::get_stats() { - return m_stats; -} void TrexStatelessPort::encode_stats(Json::Value &port) { - port["tx_bps"] = m_stats.m_stats.m_tx_bps; - port["rx_bps"] = m_stats.m_stats.m_rx_bps; - - port["tx_pps"] = m_stats.m_stats.m_tx_pps; - port["rx_pps"] = m_stats.m_stats.m_rx_pps; - - port["total_tx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_pkts); - port["total_rx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_pkts); - - port["total_tx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_bytes); - port["total_rx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_bytes); - - port["tx_rx_errors"] = Json::Value::UInt64(m_stats.m_stats.m_tx_rx_errors); -} - + const TrexPlatformApi *api = get_stateless_obj()->get_platform_api(); + TrexPlatformInterfaceStats stats; + api->get_interface_stats(m_port_id, stats); -/*************************** - * BW measurement - * - **************************/ -/* TODO: move this to a common place */ -BWMeasure::BWMeasure() { - reset(); -} - -void BWMeasure::reset(void) { - m_start=false; - m_last_time_msec=0; - m_last_bytes=0; - m_last_result=0.0; -}; - -double BWMeasure::calc_MBsec(uint32_t dtime_msec, - uint64_t dbytes){ - double rate=0.000008*( ( (double)dbytes*(double)os_get_time_freq())/((double)dtime_msec) ); - return(rate); -} + port["tx_bps"] = stats.m_stats.m_tx_bps; + port["rx_bps"] = stats.m_stats.m_rx_bps; -double BWMeasure::add(uint64_t size) { - if ( false == m_start ) { - m_start=true; - m_last_time_msec = os_get_time_msec() ; - m_last_bytes=size; - return(0.0); - } - - uint32_t ctime=os_get_time_msec(); - if ((ctime - m_last_time_msec) <os_get_time_freq() ) { - return(m_last_result); - } + port["tx_pps"] = stats.m_stats.m_tx_pps; + port["rx_pps"] = stats.m_stats.m_rx_pps; - uint32_t dtime_msec = ctime-m_last_time_msec; - uint64_t dbytes = size - m_last_bytes; + port["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts); + port["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts); - m_last_time_msec = ctime; - m_last_bytes = size; - - m_last_result= 0.5*calc_MBsec(dtime_msec,dbytes) +0.5*(m_last_result); - return( m_last_result ); + port["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes); + port["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes); + + port["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors); } - diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 428d5aee..4851a4b5 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -24,71 +24,6 @@ limitations under the License. #include <trex_stream.h> /** - * bandwidth measurement class - * - */ -class BWMeasure { -public: - BWMeasure(); - void reset(void); - double add(uint64_t size); - -private: - double calc_MBsec(uint32_t dtime_msec, - uint64_t dbytes); - -public: - bool m_start; - uint32_t m_last_time_msec; - uint64_t m_last_bytes; - double m_last_result; -}; - -/** - * TRex stateless port stats - * - * @author imarom (24-Sep-15) - */ -class TrexPortStats { - -public: - TrexPortStats() { - m_stats = {0}; - - m_bw_tx_bps.reset(); - m_bw_rx_bps.reset(); - - m_bw_tx_pps.reset(); - m_bw_rx_pps.reset(); - } - -public: - - BWMeasure m_bw_tx_bps; - BWMeasure m_bw_rx_bps; - - BWMeasure m_bw_tx_pps; - BWMeasure m_bw_rx_pps; - - struct { - - double m_tx_bps; - double m_rx_bps; - - double m_tx_pps; - double m_rx_pps; - - uint64_t m_total_tx_pkts; - uint64_t m_total_rx_pkts; - - uint64_t m_total_tx_bytes; - uint64_t m_total_rx_bytes; - - uint64_t m_tx_rx_errors; - } m_stats; -}; - -/** * describes a stateless port * * @author imarom (31-Aug-15) @@ -127,7 +62,7 @@ public: * stop traffic * */ - void stop_traffic(void); + rc_e stop_traffic(void); /** * access the stream table @@ -203,14 +138,6 @@ public: } /** - * update the values of the stats - * - */ - void update_stats(); - - const TrexPortStats & get_stats(); - - /** * encode stats as JSON */ void encode_stats(Json::Value &port); @@ -224,7 +151,6 @@ private: port_state_e m_port_state; std::string m_owner; std::string m_owner_handler; - TrexPortStats m_stats; }; #endif /* __TREX_STATELESS_PORT_H__ */ diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp index 182036f1..ba306137 100644 --- a/src/stateless/cp/trex_stream.cpp +++ b/src/stateless/cp/trex_stream.cpp @@ -20,6 +20,7 @@ limitations under the License. */ #include <trex_stream.h> #include <cstddef> +#include <string.h> /************************************** * stream @@ -103,14 +104,24 @@ TrexStream * TrexStreamTable::get_stream_by_id(uint32_t stream_id) { } } -void TrexStreamTable::get_stream_list(std::vector<uint32_t> &stream_list) { - stream_list.clear(); +void TrexStreamTable::get_id_list(std::vector<uint32_t> &id_list) { + id_list.clear(); for (auto stream : m_stream_table) { - stream_list.push_back(stream.first); + id_list.push_back(stream.first); } } +void TrexStreamTable::get_object_list(std::vector<TrexStream *> &object_list) { + object_list.clear(); + + for (auto stream : m_stream_table) { + object_list.push_back(stream.second); + } + +} + int TrexStreamTable::size() { return m_stream_table.size(); } + diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h index f5bc96ef..c8a15240 100644 --- a/src/stateless/cp/trex_stream.h +++ b/src/stateless/cp/trex_stream.h @@ -37,10 +37,6 @@ class TrexRpcCmdAddStream; * */ class TrexStream { - /* provide the RPC parser a way to access private fields */ - friend class TrexRpcCmdAddStream; - friend class TrexRpcCmdGetStream; - friend class TrexStreamTable; public: TrexStream(uint8_t port_id, uint32_t stream_id); @@ -56,7 +52,7 @@ public: /* access the stream json */ const Json::Value & get_stream_json(); -protected: +public: /* basic */ uint8_t m_port_id; uint32_t m_stream_id; @@ -189,7 +185,13 @@ public: * * @param stream_list */ - void get_stream_list(std::vector<uint32_t> &stream_list); + void get_id_list(std::vector<uint32_t> &id_list); + + /** + * populate a list with all the stream objects + * + */ + void get_object_list(std::vector<TrexStream *> &object_list); /** * get the table size @@ -197,6 +199,9 @@ public: */ int size(); + std::unordered_map<int, TrexStream *>::iterator begin() {return m_stream_table.begin();} + std::unordered_map<int, TrexStream *>::iterator end() {return m_stream_table.end();} + private: /** * holds all the stream in a hash table by stream id diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp new file mode 100644 index 00000000..6c77ad0f --- /dev/null +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -0,0 +1,81 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include <string.h> +#include <trex_streams_compiler.h> +#include <trex_stream.h> + +/************************************** + * stream compiled object + *************************************/ +TrexStreamsCompiledObj::~TrexStreamsCompiledObj() { + for (auto &obj : m_objs) { + delete obj.m_pkt; + } + m_objs.clear(); +} + +void +TrexStreamsCompiledObj::add_compiled_stream(double pps, uint8_t *pkt, uint16_t pkt_len) { + obj_st obj; + + obj.m_pps = pps; + obj.m_pkt_len = pkt_len; + + obj.m_pkt = new uint8_t[pkt_len]; + memcpy(obj.m_pkt, pkt, pkt_len); + + m_objs.push_back(obj); +} + +/************************************** + * stream compiler + *************************************/ +bool +TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj) { + /* for now we do something trivial, */ + for (auto stream : streams) { + + /* skip non-enabled streams */ + if (!stream->m_enabled) { + continue; + } + + /* for now skip also non self started streams */ + if (!stream->m_self_start) { + continue; + } + + /* for now support only continous ... */ + TrexStreamContinuous *cont_stream = dynamic_cast<TrexStreamContinuous *>(stream); + if (!cont_stream) { + continue; + } + + /* add it */ + obj.add_compiled_stream(cont_stream->get_pps(), + cont_stream->m_pkt.binary, + cont_stream->m_pkt.len); + } + + return true; +} + diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h new file mode 100644 index 00000000..90253cdf --- /dev/null +++ b/src/stateless/cp/trex_streams_compiler.h @@ -0,0 +1,68 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STREAMS_COMPILER_H__ +#define __TREX_STREAMS_COMPILER_H__ + +#include <stdint.h> +#include <vector> + +class TrexStreamsCompiler; +class TrexStream; + +/** + * compiled object for a table of streams + * + * @author imarom (28-Oct-15) + */ +class TrexStreamsCompiledObj { + friend class TrexStreamsCompiler; +public: + + TrexStreamsCompiledObj() {} + ~TrexStreamsCompiledObj(); + + struct obj_st { + double m_pps; + uint8_t *m_pkt; + uint16_t m_pkt_len; + }; + + const std::vector<obj_st> & get_objects() { + return m_objs; + } + +private: + void add_compiled_stream(double pps, uint8_t *pkt, uint16_t pkt_len); + std::vector<obj_st> m_objs; +}; + +class TrexStreamsCompiler { +public: + /** + * compiles a vector of streams to an object passable to the DP + * + * @author imarom (28-Oct-15) + * + */ + bool compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj); +}; + +#endif /* __TREX_STREAMS_COMPILER_H__ */ diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 3755b82c..73387f0e 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -18,118 +18,148 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - #include <trex_stateless_dp_core.h> -#include <stdio.h> -#include <unistd.h> -#include <trex_stateless.h> +#include <trex_stateless_messaging.h> +#include <trex_streams_compiler.h> #include <bp_sim.h> -#ifndef TREX_RPC_MOCK_SERVER +/** + * extended info for the stateless node + * TODO: + * static_assert(sizeof(dp_node_extended_info_st) <= sizeof(CGenNodeStateless::m_pad_end), "hello"); + */ +typedef struct dp_node_extended_info_ { + double next_time_offset; + uint8_t is_stream_active; -// DPDK c++ issue -#define UINT8_MAX 255 -#define UINT16_MAX 0xFFFF -// DPDK c++ issue -#endif +} dp_node_extended_info_st; -#include <rte_ethdev.h> -#include "mbuf.h" +TrexStatelessDpCore::TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core) { + m_thread_id = thread_id; + m_core = core; -/** - * TEST - * - */ -static const uint8_t udp_pkt[]={ - 0x00,0x00,0x00,0x01,0x00,0x00, - 0x00,0x00,0x00,0x01,0x00,0x00, - 0x08,0x00, - - 0x45,0x00,0x00,0x81, - 0xaf,0x7e,0x00,0x00, - 0x12,0x11,0xd9,0x23, - 0x01,0x01,0x01,0x01, - 0x3d,0xad,0x72,0x1b, - - 0x11,0x11, - 0x11,0x11, - - 0x00,0x6d, - 0x00,0x00, - - 0x64,0x31,0x3a,0x61, - 0x64,0x32,0x3a,0x69,0x64, - 0x32,0x30,0x3a,0xd0,0x0e, - 0xa1,0x4b,0x7b,0xbd,0xbd, - 0x16,0xc6,0xdb,0xc4,0xbb,0x43, - 0xf9,0x4b,0x51,0x68,0x33,0x72, - 0x20,0x39,0x3a,0x69,0x6e,0x66,0x6f, - 0x5f,0x68,0x61,0x73,0x68,0x32,0x30,0x3a,0xee,0xc6,0xa3, - 0xd3,0x13,0xa8,0x43,0x06,0x03,0xd8,0x9e,0x3f,0x67,0x6f, - 0xe7,0x0a,0xfd,0x18,0x13,0x8d,0x65,0x31,0x3a,0x71,0x39, - 0x3a,0x67,0x65,0x74,0x5f,0x70,0x65,0x65,0x72,0x73,0x31, - 0x3a,0x74,0x38,0x3a,0x3d,0xeb,0x0c,0xbf,0x0d,0x6a,0x0d, - 0xa5,0x31,0x3a,0x79,0x31,0x3a,0x71,0x65,0x87,0xa6,0x7d, - 0xe7 -}; - -static int -test_inject_pkt(uint8_t *pkt, uint32_t pkt_size) { - - #ifndef TREX_RPC_MOCK_SERVER - rte_mempool_t * mp= CGlobalInfo::m_mem_pool[0].m_big_mbuf_pool ; - #else - rte_mempool_t * mp = NULL; - #endif - - rte_mbuf_t *m = rte_pktmbuf_alloc(mp); - if ( unlikely(m==0) ) { - printf("ERROR no packets \n"); - return (-1); + CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp(); + + m_ring_from_cp = cp_dp->getRingCpToDp(thread_id); + m_ring_to_cp = cp_dp->getRingDpToCp(thread_id); + + m_state = STATE_IDLE; +} + +void +TrexStatelessDpCore::start() { + + /* creates a maintenace job using the scheduler */ + CGenNode * node_sync = m_core->create_node() ; + node_sync->m_type = CGenNode::FLOW_SYNC; + node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT; + m_core->m_node_gen.add_node(node_sync); + + double old_offset = 0.0; + m_core->m_node_gen.flush_file(100000000, 0.0, false, m_core, old_offset); + +} + +void +TrexStatelessDpCore::handle_pkt_event(CGenNode *node) { + + //TODO: optimize the fast path here... + + CGenNodeStateless *node_sl = (CGenNodeStateless *)node; + dp_node_extended_info_st *opaque = (dp_node_extended_info_st *)node_sl->get_opaque_storage(); + + /* is this stream active ? */ + if (!opaque->is_stream_active) { + m_core->free_node(node); + return; } + + m_core->m_node_gen.m_v_if->send_node(node); + + /* in case of continues */ + node->m_time += opaque->next_time_offset; + + /* insert a new event */ + m_core->m_node_gen.m_p_queue.push(node); +} + +void +TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len) { + CGenNodeStateless *node = m_core->create_node_sl(); + + /* add periodic */ + node->m_type = CGenNode::STATELESS_PKT; + node->m_time = m_core->m_cur_time_sec + 0.0 /* STREAM ISG */; + node->m_flags = 0; + + /* set socket id */ + node->set_socket_id(m_core->m_node_gen.m_socket_id); + + /* build a mbuf from a packet */ + uint16_t pkt_size = pkt_len; + const uint8_t *stream_pkt = pkt; + + dp_node_extended_info_st *opaque = (dp_node_extended_info_st *)node->get_opaque_storage(); + opaque->next_time_offset = 1.0 / pps; + opaque->is_stream_active = 1; + + /* allocate const mbuf */ + rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size); + assert(m); + char *p = rte_pktmbuf_append(m, pkt_size); assert(p); - /* set pkt data */ - memcpy(p,pkt,pkt_size); + /* copy the packet */ + memcpy(p,stream_pkt,pkt_size); - rte_mbuf_t *tx_pkts[32]; - tx_pkts[0] = m; - uint8_t nb_pkts = 1; - uint16_t ret = rte_eth_tx_burst(0, 0, tx_pkts, nb_pkts); - (void)ret; - rte_pktmbuf_free(m); + /* set dir 0 or 1 client or server */ + pkt_dir_t dir = 0; + node->set_mbuf_cache_dir(dir); - return (0); -} + /* TBD repace the mac if req we should add flag */ + m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m); -static int -test_inject_udp_pkt(){ - return (test_inject_pkt((uint8_t*)udp_pkt,sizeof(udp_pkt))); + /* set the packet as a readonly */ + node->set_cache_mbuf(m); + + m_state = TrexStatelessDpCore::STATE_TRANSMITTING; + + /* keep track */ + m_active_nodes.push_back(node); + + /* schedule */ + m_core->m_node_gen.add_node((CGenNode *)node); } void -TrexStatelessDpCore::test_inject_dummy_pkt() { - test_inject_udp_pkt(); +TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj) { + for (auto single_stream : obj->get_objects()) { + add_cont_stream(single_stream.m_pps, single_stream.m_pkt, single_stream.m_pkt_len); + } } -/*************************** - * DP core - * - **************************/ -TrexStatelessDpCore::TrexStatelessDpCore(uint8_t core_id) : m_core_id(core_id) { +void +TrexStatelessDpCore::stop_traffic() { + /* we cannot remove nodes not from the top of the queue so + for every active node - make sure next time + the scheduler invokes it, it will be free */ + for (auto node : m_active_nodes) { + dp_node_extended_info_st *opaque = (dp_node_extended_info_st *)node->get_opaque_storage(); + opaque->is_stream_active = 0; + } + m_active_nodes.clear(); + + m_state = STATE_IDLE; } /** - * main function for DP core + * handle a message from CP to DP * */ -void -TrexStatelessDpCore::run() { - printf("\nOn DP core %d\n", m_core_id); - while (true) { - test_inject_dummy_pkt(); - rte_pause(); - } +void +TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) { + msg->handle(this); + delete msg; } diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index 4b09b752..d95f7eeb 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -21,23 +21,108 @@ limitations under the License. #ifndef __TREX_STATELESS_DP_CORE_H__ #define __TREX_STATELESS_DP_CORE_H__ -#include <stdint.h> +#include <vector> + +#include <msg_manager.h> +#include <pal_utl.h> + +class TrexStatelessCpToDpMsgBase; +class TrexStatelessDpStart; +class CFlowGenListPerThread; +class CGenNode; +class TrexStreamsCompiledObj; +class CGenNodeStateless; -/** - * stateless DP core object - * - */ class TrexStatelessDpCore { + public: - TrexStatelessDpCore(uint8_t core_id); + /* states */ + enum state_e { + STATE_IDLE, + STATE_TRANSMITTING + }; + + TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core); + + /** + * launch the stateless DP core code + * + */ + void start(); + + /** + * handle pkt event + * + * @author imarom (27-Oct-15) + */ + void handle_pkt_event(CGenNode *node); + + /** + * dummy traffic creator + * + * @author imarom (27-Oct-15) + * + * @param pkt + * @param pkt_len + */ + void start_traffic(TrexStreamsCompiledObj *obj); + + /** + * stop all traffic for this core + * + */ + void stop_traffic(); + + /** + * check for and handle messages from CP + * + * @author imarom (27-Oct-15) + */ + void periodic_check_for_cp_messages() { + // doing this inline for performance reasons - /* starts the DP core run */ - void run(); + /* fast path */ + if ( likely ( m_ring_from_cp->isEmpty() ) ) { + return; + } + + while ( true ) { + CGenNode * node = NULL; + if (m_ring_from_cp->Dequeue(node) != 0) { + break; + } + assert(node); + + TrexStatelessCpToDpMsgBase * msg = (TrexStatelessCpToDpMsgBase *)node; + handle_cp_msg(msg); + } + + } private: - void test_inject_dummy_pkt(); - uint8_t m_core_id; + /** + * handles a CP to DP message + * + * @author imarom (27-Oct-15) + * + * @param msg + */ + void handle_cp_msg(TrexStatelessCpToDpMsgBase *msg); + + void add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len); + + uint8_t m_thread_id; + state_e m_state; + CNodeRing *m_ring_from_cp; + CNodeRing *m_ring_to_cp; + + /* holds the current active nodes */ + std::vector<CGenNodeStateless *> m_active_nodes; + + /* pointer to the main object */ + CFlowGenListPerThread *m_core; }; #endif /* __TREX_STATELESS_DP_CORE_H__ */ + diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp new file mode 100644 index 00000000..3c6a5933 --- /dev/null +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -0,0 +1,53 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include <trex_stateless_messaging.h> +#include <trex_stateless_dp_core.h> +#include <trex_streams_compiler.h> +#include <string.h> + +/************************* + start traffic message + ************************/ +TrexStatelessDpStart::TrexStatelessDpStart(TrexStreamsCompiledObj *obj) : m_obj(obj) { +} + +TrexStatelessDpStart::~TrexStatelessDpStart() { + if (m_obj) { + delete m_obj; + } +} + +bool +TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { + + dp_core->start_traffic(m_obj); + return true; +} + +/************************* + stop traffic message + ************************/ +bool +TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) { + dp_core->stop_traffic(); + return true; +} + diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h new file mode 100644 index 00000000..13f6c05a --- /dev/null +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -0,0 +1,79 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STATELESS_MESSAGING_H__ +#define __TREX_STATELESS_MESSAGING_H__ + +#include <msg_manager.h> + +class TrexStatelessDpCore; +class TrexStreamsCompiledObj; + +/** + * defines the base class for CP to DP messages + * + * @author imarom (27-Oct-15) + */ +class TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessCpToDpMsgBase() { + } + + virtual ~TrexStatelessCpToDpMsgBase() { + } + + /** + * virtual function to handle a message + * + */ + virtual bool handle(TrexStatelessDpCore *dp_core) = 0; +}; + +/** + * a message to start traffic + * + * @author imarom (27-Oct-15) + */ +class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpStart(TrexStreamsCompiledObj *obj); + + ~TrexStatelessDpStart(); + + virtual bool handle(TrexStatelessDpCore *dp_core); + +private: + TrexStreamsCompiledObj *m_obj; +}; + +/** + * a message to stop traffic + * + * @author imarom (27-Oct-15) + */ +class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase { +public: + virtual bool handle(TrexStatelessDpCore *dp_core); +}; + + +#endif /* __TREX_STATELESS_MESSAGING_H__ */ |