diff options
author | 2015-12-13 17:18:02 +0200 | |
---|---|---|
committer | 2015-12-13 17:18:02 +0200 | |
commit | 9738e267d806223ee25e013b5959ccac26c1a14a (patch) | |
tree | 590c8f329f2ab68c7da3f1f8f4c55f81243a08bc /src/stateless | |
parent | a573adc6395c9ad8d96978508a07a654ef48c7a9 (diff) | |
parent | 301341ddb1bf17387d7fea19667bedd40fce4509 (diff) |
Merge branch 'master' into get_logs_and_version
Diffstat (limited to 'src/stateless')
-rw-r--r-- | src/stateless/cp/trex_dp_port_events.cpp | 220 | ||||
-rw-r--r-- | src/stateless/cp/trex_dp_port_events.h | 171 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.cpp | 164 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.h | 83 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 607 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 396 | ||||
-rw-r--r-- | src/stateless/cp/trex_stream.cpp | 93 | ||||
-rw-r--r-- | src/stateless/cp/trex_stream.h | 199 | ||||
-rw-r--r-- | src/stateless/cp/trex_streams_compiler.cpp | 746 | ||||
-rw-r--r-- | src/stateless/cp/trex_streams_compiler.h | 229 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.cpp | 689 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.h | 260 | ||||
-rw-r--r-- | src/stateless/dp/trex_stream_node.h | 284 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 191 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 319 |
15 files changed, 4130 insertions, 521 deletions
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp new file mode 100644 index 00000000..ba327e59 --- /dev/null +++ b/src/stateless/cp/trex_dp_port_events.cpp @@ -0,0 +1,220 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include <trex_dp_port_events.h> +#include <sstream> +#include <os_time.h> +#include <trex_stateless.h> + +/** + * port events + */ +void +TrexDpPortEvents::create(TrexStatelessPort *port) { + m_port = port; + + for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) { + m_events[i].create((TrexDpPortEvent::event_e) i, port); + } + + m_event_id_counter = EVENT_ID_INVALID; +} + +/** + * generate a new event ID + * + */ +int +TrexDpPortEvents::generate_event_id() { + return (++m_event_id_counter); +} + +/** + * mark the next allowed event + * all other events will be disabled + * + */ +void +TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) { + + /* first disable all events */ + for (TrexDpPortEvent & e : m_events) { + e.disable(); + } + + /* mark this event as allowed */ + m_events[ev].wait_for_event(event_id, timeout_ms); +} + +void +TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) { + m_events[ev].disable(); +} + +/** + * handle an event + * + */ +void +TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) { + m_events[ev].handle_event(thread_id, event_id); +} + +/*********** + * single event object + * + */ + +void +TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) { + m_event_type = type; + m_port = port; + + /* add the core ids to the hash */ + m_signal.clear(); + for (int core_id : m_port->get_core_id_list()) { + m_signal[core_id] = false; + } + + /* event is disabled */ + disable(); +} + + +/** + * wait the event using event id and timeout + * + */ +void +TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) { + + /* set a new event id */ + m_event_id = event_id; + + /* do we have a timeout ? */ + if (timeout_ms > 0) { + m_expire_limit_ms = os_get_time_msec() + timeout_ms; + } else { + m_expire_limit_ms = -1; + } + + /* prepare the signal array */ + m_pending_cnt = 0; + for (auto & core_pair : m_signal) { + core_pair.second = false; + m_pending_cnt++; + } +} + +void +TrexDpPortEvent::disable() { + m_event_id = TrexDpPortEvents::EVENT_ID_INVALID; +} + +/** + * get the event status + * + */ + +TrexDpPortEvent::event_status_e +TrexDpPortEvent::status() { + + /* is it even active ? */ + if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) { + return (EVENT_DISABLE); + } + + /* did it occured ? */ + if (m_pending_cnt == 0) { + return (EVENT_OCCURED); + } + + /* so we are enabled and the event did not occur - maybe we timed out ? */ + if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) { + return (EVENT_TIMED_OUT); + } + + /* so we are still waiting... */ + return (EVENT_PENDING); + +} + +void +TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg) { + std::stringstream err; + err << "DP event '" << event_name(m_event_type) << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - "; +} + +/** + * event occured + * + */ +void +TrexDpPortEvent::handle_event(int thread_id, int event_id) { + + /* if the event is disabled - we don't care */ + if (!is_active()) { + return; + } + + /* check the event id is matching the required event - if not maybe its an old signal */ + if (event_id != m_event_id) { + return; + } + + /* mark sure no double signal */ + if (m_signal.at(thread_id)) { + err(thread_id, event_id, "double signal"); + + } else { + /* mark */ + m_signal.at(thread_id) = true; + m_pending_cnt--; + } + + /* event occured */ + if (m_pending_cnt == 0) { + m_port->on_dp_event_occured(m_event_type); + m_event_id = TrexDpPortEvents::EVENT_ID_INVALID; + } +} + +bool +TrexDpPortEvent::is_active() { + return (status() != EVENT_DISABLE); +} + +bool +TrexDpPortEvent::has_timeout_expired() { + return (status() == EVENT_TIMED_OUT); +} + +const char * +TrexDpPortEvent::event_name(event_e type) { + switch (type) { + case EVENT_STOP: + return "DP STOP"; + + default: + throw TrexException("unknown event type"); + } + +} diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h new file mode 100644 index 00000000..557e590b --- /dev/null +++ b/src/stateless/cp/trex_dp_port_events.h @@ -0,0 +1,171 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_DP_PORT_EVENTS_H__ +#define __TREX_DP_PORT_EVENTS_H__ + +#include <unordered_map> +#include <string> + +class TrexStatelessPort; + +/** + * describes a single DP event related to port + * + * @author imarom (18-Nov-15) + */ +class TrexDpPortEvent { +public: + + enum event_e { + EVENT_STOP = 1, + EVENT_MAX + }; + + /** + * status of the event for the port + */ + enum event_status_e { + EVENT_DISABLE, + EVENT_PENDING, + EVENT_TIMED_OUT, + EVENT_OCCURED + }; + + /** + * init for the event + * + */ + void create(event_e type, TrexStatelessPort *port); + + /** + * create a new pending event + * + */ + void wait_for_event(int event_id, int timeout_ms = -1); + + /** + * mark event as not allowed to happen + * + */ + void disable(); + + /** + * get the event status + * + */ + event_status_e status(); + + /** + * event occured + * + */ + void handle_event(int thread_id, int event_id); + + /** + * returns true if event is active + * + */ + bool is_active(); + + /** + * has timeout already expired ? + * + */ + bool has_timeout_expired(); + + /** + * generate error + * + */ + void err(int thread_id, int event_id, const std::string &err_msg); + + /** + * event to name + * + */ + static const char * event_name(event_e type); + + +private: + + event_e m_event_type; + std::unordered_map<int, bool> m_signal; + int m_pending_cnt; + + TrexStatelessPort *m_port; + int m_event_id; + int m_expire_limit_ms; + +}; + +/** + * all the events related to a port + * + */ +class TrexDpPortEvents { +public: + friend class TrexDpPortEvent; + + void create(TrexStatelessPort *port); + + /** + * generate a new event ID to be used with wait_for_event + * + */ + int generate_event_id(); + + /** + * wait a new DP event on the port + * returns a key which will be used to identify + * the event happened + * + * @author imarom (18-Nov-15) + * + * @param ev - type of event + * @param event_id - a unique identifier for the event + * @param timeout_ms - does it has a timeout ? + * + */ + void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1); + + /** + * disable an event (don't care) + * + */ + void disable(TrexDpPortEvent::event_e ev); + + /** + * event has occured + * + */ + void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id); + +private: + static const int EVENT_ID_INVALID = -1; + + TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX]; + int m_event_id_counter; + + TrexStatelessPort *m_port; + +}; + +#endif /* __TREX_DP_PORT_EVENTS_H__ */ diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 72762e26..a4522837 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -31,55 +31,60 @@ using namespace std; * Trex stateless object * **********************************************************/ -TrexStateless::TrexStateless() { - m_is_configured = false; -} - /** - * configure the singleton stateless object * */ -void TrexStateless::configure(const TrexStatelessCfg &cfg) { - - TrexStateless& instance = get_instance_internal(); - - /* check status */ - if (instance.m_is_configured) { - throw TrexException("re-configuration of stateless object is not allowed"); - } +TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { /* create RPC servers */ /* set both servers to mutex each other */ - instance.m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &instance.m_global_cp_lock); - instance.m_rpc_server->set_verbose(cfg.m_rpc_server_verbose); + m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &m_global_cp_lock); + m_rpc_server->set_verbose(cfg.m_rpc_server_verbose); /* configure ports */ + m_port_count = cfg.m_port_count; - instance.m_port_count = cfg.m_port_count; - - for (int i = 0; i < instance.m_port_count; i++) { - instance.m_ports.push_back(new TrexStatelessPort(i)); + for (int i = 0; i < m_port_count; i++) { + m_ports.push_back(new TrexStatelessPort(i, cfg.m_platform_api)); } - /* cores */ - instance.m_dp_core_count = cfg.m_dp_core_count; - for (int i = 0; i < instance.m_dp_core_count; i++) { - instance.m_dp_cores.push_back(new TrexStatelessDpCore(i)); + m_platform_api = cfg.m_platform_api; + m_publisher = cfg.m_publisher; + +} + +/** + * release all memory + * + * @author imarom (08-Oct-15) + */ +TrexStateless::~TrexStateless() { + + /* release memory for ports */ + for (auto port : m_ports) { + delete port; } + m_ports.clear(); - /* done */ - instance.m_is_configured = true; + /* stops the RPC server */ + m_rpc_server->stop(); + delete m_rpc_server; + + m_rpc_server = NULL; + + delete m_platform_api; + m_platform_api = NULL; } + /** * starts the control plane side * */ void TrexStateless::launch_control_plane() { - //std::cout << "\n on control/master core \n"; /* pin this process to the current running CPU any new thread will be called on the same CPU @@ -94,39 +99,6 @@ TrexStateless::launch_control_plane() { m_rpc_server->start(); } -void -TrexStateless::launch_on_dp_core(uint8_t core_id) { - m_dp_cores[core_id - 1]->run(); -} - -/** - * destroy the singleton and release all memory - * - * @author imarom (08-Oct-15) - */ -void -TrexStateless::destroy() { - TrexStateless& instance = get_instance_internal(); - - if (!instance.m_is_configured) { - return; - } - - /* release memory for ports */ - for (auto port : instance.m_ports) { - delete port; - } - instance.m_ports.clear(); - - /* stops the RPC server */ - instance.m_rpc_server->stop(); - delete instance.m_rpc_server; - - instance.m_rpc_server = NULL; - - /* done */ - instance.m_is_configured = false; -} /** * fetch a port by ID @@ -148,57 +120,32 @@ TrexStateless::get_port_count() { uint8_t TrexStateless::get_dp_core_count() { - return m_dp_core_count; -} - -void -TrexStateless::update_stats() { - - /* update CPU util. - TODO - */ - m_stats.m_stats.m_cpu_util = 0; - - /* for every port update and accumulate */ - for (uint8_t i = 0; i < m_port_count; i++) { - m_ports[i]->update_stats(); - - const TrexPortStats & port_stats = m_ports[i]->get_stats(); - - m_stats.m_stats.m_tx_bps += port_stats.m_stats.m_tx_bps; - m_stats.m_stats.m_rx_bps += port_stats.m_stats.m_rx_bps; - - m_stats.m_stats.m_tx_pps += port_stats.m_stats.m_tx_pps; - m_stats.m_stats.m_rx_pps += port_stats.m_stats.m_rx_pps; - - m_stats.m_stats.m_total_tx_pkts += port_stats.m_stats.m_total_tx_pkts; - m_stats.m_stats.m_total_rx_pkts += port_stats.m_stats.m_total_rx_pkts; - - m_stats.m_stats.m_total_tx_bytes += port_stats.m_stats.m_total_tx_bytes; - m_stats.m_stats.m_total_rx_bytes += port_stats.m_stats.m_total_rx_bytes; - - m_stats.m_stats.m_tx_rx_errors += port_stats.m_stats.m_tx_rx_errors; - } + return m_platform_api->get_dp_core_count(); } void TrexStateless::encode_stats(Json::Value &global) { - global["cpu_util"] = m_stats.m_stats.m_cpu_util; + const TrexPlatformApi *api = get_stateless_obj()->get_platform_api(); + + TrexPlatformGlobalStats stats; + api->get_global_stats(stats); - global["tx_bps"] = m_stats.m_stats.m_tx_bps; - global["rx_bps"] = m_stats.m_stats.m_rx_bps; + global["cpu_util"] = stats.m_stats.m_cpu_util; - global["tx_pps"] = m_stats.m_stats.m_tx_pps; - global["rx_pps"] = m_stats.m_stats.m_rx_pps; + global["tx_bps"] = stats.m_stats.m_tx_bps; + global["rx_bps"] = stats.m_stats.m_rx_bps; - global["total_tx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_pkts); - global["total_rx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_pkts); + global["tx_pps"] = stats.m_stats.m_tx_pps; + global["rx_pps"] = stats.m_stats.m_rx_pps; - global["total_tx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_bytes); - global["total_rx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_bytes); + global["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts); + global["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts); - global["tx_rx_errors"] = Json::Value::UInt64(m_stats.m_stats.m_tx_rx_errors); + global["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes); + global["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes); + + global["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors); for (uint8_t i = 0; i < m_port_count; i++) { std::stringstream ss; @@ -210,3 +157,20 @@ TrexStateless::encode_stats(Json::Value &global) { } } +/** + * generate a snapshot for publish (async publish) + * + */ +void +TrexStateless::generate_publish_snapshot(std::string &snapshot) { + Json::FastWriter writer; + Json::Value root; + + root["name"] = "trex-stateless-info"; + root["type"] = 0; + + /* stateless specific info goes here */ + root["data"] = Json::nullValue; + + snapshot = writer.write(root); +} diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 649b25dd..5c11be1e 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -29,8 +29,10 @@ limitations under the License. #include <trex_stream.h> #include <trex_stateless_port.h> -#include <trex_stateless_dp_core.h> #include <trex_rpc_server_api.h> +#include <publisher/trex_publisher.h> + +#include <internal_api/trex_platform_api.h> /** * generic exception for errors @@ -88,17 +90,19 @@ public: /* default values */ TrexStatelessCfg() { m_port_count = 0; - m_dp_core_count = 0; m_rpc_req_resp_cfg = NULL; m_rpc_async_cfg = NULL; - m_rpc_server_verbose = false; + m_rpc_server_verbose = false; + m_platform_api = NULL; + m_publisher = NULL; } const TrexRpcServerConfig *m_rpc_req_resp_cfg; const TrexRpcServerConfig *m_rpc_async_cfg; + const TrexPlatformApi *m_platform_api; bool m_rpc_server_verbose; uint8_t m_port_count; - uint8_t m_dp_core_count; + TrexPublisher *m_publisher; }; /** @@ -113,27 +117,8 @@ public: * reconfiguration is not allowed * an exception will be thrown */ - static void configure(const TrexStatelessCfg &cfg); - - /** - * destroy the instance - * - */ - static void destroy(); - - /** - * singleton public get instance - * - */ - static TrexStateless& get_instance() { - TrexStateless& instance = get_instance_internal(); - - if (!instance.m_is_configured) { - throw TrexException("object is not configured"); - } - - return instance; - } + TrexStateless(const TrexStatelessCfg &cfg); + ~TrexStateless(); /** * starts the control plane side @@ -152,12 +137,6 @@ public: uint8_t get_dp_core_count(); - /** - * update all the stats (deep update) - * (include all the ports and global stats) - * - */ - void update_stats(); /** * fetch all the stats @@ -165,22 +144,29 @@ public: */ void encode_stats(Json::Value &global); + /** + * generate a snapshot for publish + */ + void generate_publish_snapshot(std::string &snapshot); -protected: - TrexStateless(); + const TrexPlatformApi * get_platform_api() { + return (m_platform_api); + } - static TrexStateless& get_instance_internal () { - static TrexStateless instance; - return instance; + TrexPublisher * get_publisher() { + return m_publisher; } - /* c++ 2011 style singleton */ + const std::vector <TrexStatelessPort *> get_port_list() { + return m_ports; + } + +protected: + + /* no copy or assignment */ TrexStateless(TrexStateless const&) = delete; void operator=(TrexStateless const&) = delete; - /* status */ - bool m_is_configured; - /* RPC server array */ TrexRpcServer *m_rpc_server; @@ -188,15 +174,22 @@ protected: std::vector <TrexStatelessPort *> m_ports; uint8_t m_port_count; - /* cores */ - std::vector <TrexStatelessDpCore *> m_dp_cores; - uint8_t m_dp_core_count; + /* platform API */ + const TrexPlatformApi *m_platform_api; - /* stats */ - TrexStatelessStats m_stats; + TrexPublisher *m_publisher; std::mutex m_global_cp_lock; }; +/** + * an anchor function + * + * @author imarom (25-Oct-15) + * + * @return TrexStateless& + */ +TrexStateless * get_stateless_obj(); + #endif /* __TREX_STATELESS_H__ */ diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index a31847a5..9770c735 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -18,211 +18,600 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + #include <trex_stateless.h> #include <trex_stateless_port.h> +#include <trex_stateless_messaging.h> +#include <trex_streams_compiler.h> + #include <string> #ifndef TREX_RPC_MOCK_SERVER + // DPDK c++ issue -#define UINT8_MAX 255 -#define UINT16_MAX 0xFFFF +#ifndef UINT8_MAX + #define UINT8_MAX 255 +#endif + +#ifndef UINT16_MAX + #define UINT16_MAX 0xFFFF +#endif + // DPDK c++ issue #endif #include <rte_ethdev.h> #include <os_time.h> +void +port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list); + using namespace std; /*************************** * trex stateless port * **************************/ -TrexStatelessPort::TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) { - m_port_state = PORT_STATE_UP_IDLE; - clear_owner(); +TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) { + std::vector<std::pair<uint8_t, uint8_t>> core_pair_list; + + m_port_id = port_id; + m_port_state = PORT_STATE_IDLE; + + /* get the platform specific data */ + api->get_interface_info(port_id, m_driver_name, m_speed); + + /* get the DP cores belonging to this port */ + api->port_id_to_cores(m_port_id, core_pair_list); + + for (auto core_pair : core_pair_list) { + + /* send the core id */ + m_cores_id_list.push_back(core_pair.first); + } + + /* init the events DP DB */ + m_dp_events.create(this); } /** - * starts the traffic on the port + * acquire the port * + * @author imarom (09-Nov-15) + * + * @param user + * @param force */ -TrexStatelessPort::rc_e -TrexStatelessPort::start_traffic(void) { +void +TrexStatelessPort::acquire(const std::string &user, bool force) { - if (m_port_state != PORT_STATE_UP_IDLE) { - return (RC_ERR_BAD_STATE_FOR_OP); + /* if port is free - just take it */ + if (get_owner().is_free()) { + get_owner().own(user); + return; } - if (get_stream_table()->size() == 0) { - return (RC_ERR_NO_STREAMS); + if (force) { + get_owner().own(user); + + /* inform the other client of the steal... */ + Json::Value data; + data["port_id"] = m_port_id; + get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FORCE_ACQUIRED, data); + + } else { + /* not same user or session id and not force - report error */ + if (get_owner().get_name() == user) { + throw TrexRpcException("port is already owned by another session of '" + user + "'"); + } else { + throw TrexRpcException("port is already taken by '" + get_owner().get_name() + "'"); + } } - m_port_state = PORT_STATE_TRANSMITTING; +} - /* real code goes here */ - return (RC_OK); +void +TrexStatelessPort::release(void) { + get_owner().release(); } -void -TrexStatelessPort::stop_traffic(void) { +/** + * starts the traffic on the port + * + */ +void +TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration) { + + /* command allowed only on state stream */ + verify_state(PORT_STATE_STREAMS); + + /* just making sure no leftovers... */ + delete_streams_graph(); + + /* on start - we can only provide absolute values */ + assert(mul.m_op == TrexPortMultiplier::OP_ABS); + + double factor = calculate_effective_factor(mul); + + /* fetch all the streams from the table */ + vector<TrexStream *> streams; + get_object_list(streams); - /* real code goes here */ - if (m_port_state == PORT_STATE_TRANSMITTING) { - m_port_state = PORT_STATE_UP_IDLE; + + /* compiler it */ + std::vector<TrexStreamsCompiledObj *> compiled_objs; + std::string fail_msg; + + TrexStreamsCompiler compiler; + bool rc = compiler.compile(m_port_id, + streams, + compiled_objs, + get_dp_core_count(), + factor, + &fail_msg); + if (!rc) { + throw TrexRpcException(fail_msg); } + + /* generate a message to all the relevant DP cores to start transmitting */ + int event_id = m_dp_events.generate_event_id(); + + /* mark that DP event of stoppped is possible */ + m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id); + + + /* update object status */ + m_factor = factor; + m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues(); + m_last_duration = duration; + change_state(PORT_STATE_TX); + + + /* update the DP - messages will be freed by the DP */ + int index = 0; + for (auto core_id : m_cores_id_list) { + + TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration); + send_message_to_dp(core_id, start_msg); + + index++; + } + + /* update subscribers */ + Json::Value data; + data["port_id"] = m_port_id; + get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data); + } + /** -* access the stream table -* -*/ -TrexStreamTable * TrexStatelessPort::get_stream_table() { - return &m_stream_table; + * stop traffic on port + * + * @author imarom (09-Nov-15) + * + * @return TrexStatelessPort::rc_e + */ +void +TrexStatelessPort::stop_traffic(void) { + + if (!( (m_port_state == PORT_STATE_TX) + || (m_port_state == PORT_STATE_PAUSE) )) { + return; + } + + /* delete any previous graphs */ + delete_streams_graph(); + + /* mask out the DP stop event */ + m_dp_events.disable(TrexDpPortEvent::EVENT_STOP); + + /* generate a message to all the relevant DP cores to start transmitting */ + TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id); + + send_message_to_all_dp(stop_msg); + + change_state(PORT_STATE_STREAMS); + + Json::Value data; + data["port_id"] = m_port_id; + get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data); + +} + +void +TrexStatelessPort::pause_traffic(void) { + + verify_state(PORT_STATE_TX); + + if (m_last_all_streams_continues == false) { + throw TrexRpcException(" pause is supported when all streams are in continues mode "); + } + + if ( m_last_duration>0.0 ) { + throw TrexRpcException(" pause is supported when duration is not enable is start command "); + } + + TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id); + + send_message_to_all_dp(pause_msg); + + change_state(PORT_STATE_PAUSE); + + Json::Value data; + data["port_id"] = m_port_id; + get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data); +} + +void +TrexStatelessPort::resume_traffic(void) { + + verify_state(PORT_STATE_PAUSE); + + /* generate a message to all the relevant DP cores to start transmitting */ + TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id); + + send_message_to_all_dp(resume_msg); + + change_state(PORT_STATE_TX); + + + Json::Value data; + data["port_id"] = m_port_id; + get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_RESUMED, data); } +void +TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul) { + + double factor; + + verify_state(PORT_STATE_TX | PORT_STATE_PAUSE); + + /* generate a message to all the relevant DP cores to start transmitting */ + double new_factor = calculate_effective_factor(mul); + + switch (mul.m_op) { + case TrexPortMultiplier::OP_ABS: + factor = new_factor / m_factor; + break; + + case TrexPortMultiplier::OP_ADD: + factor = (m_factor + new_factor) / m_factor; + break; + + case TrexPortMultiplier::OP_SUB: + factor = (m_factor - new_factor) / m_factor; + if (factor <= 0) { + throw TrexRpcException("Update request will lower traffic to less than zero"); + } + break; + + default: + assert(0); + break; + } + + TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor); + + send_message_to_all_dp(update_msg); + + m_factor *= factor; + +} std::string -TrexStatelessPort::get_state_as_string() { +TrexStatelessPort::get_state_as_string() const { switch (get_state()) { case PORT_STATE_DOWN: - return "down"; + return "DOWN"; + + case PORT_STATE_IDLE: + return "IDLE"; - case PORT_STATE_UP_IDLE: - return "idle"; + case PORT_STATE_STREAMS: + return "STREAMS"; - case PORT_STATE_TRANSMITTING: - return "transmitting"; + case PORT_STATE_TX: + return "TX"; + + case PORT_STATE_PAUSE: + return "PAUSE"; } - return "unknown"; + return "UNKNOWN"; } void -TrexStatelessPort::get_properties(string &driver, string &speed) { +TrexStatelessPort::get_properties(std::string &driver, TrexPlatformApi::driver_speed_e &speed) { - /* take this from DPDK */ - driver = "e1000"; - speed = "1 Gbps"; + driver = m_driver_name; + speed = m_speed; } +bool +TrexStatelessPort::verify_state(int state, bool should_throw) const { + if ( (state & m_port_state) == 0 ) { + if (should_throw) { + throw TrexRpcException("command cannot be executed on current state: '" + get_state_as_string() + "'"); + } else { + return false; + } + } -/** - * generate a random connection handler - * - */ -std::string -TrexStatelessPort::generate_handler() { - std::stringstream ss; + return true; +} - static const char alphanum[] = - "0123456789" - "ABCDEFGHIJKLMNOPQRSTUVWXYZ" - "abcdefghijklmnopqrstuvwxyz"; +void +TrexStatelessPort::change_state(port_state_e new_state) { - /* generate 8 bytes of random handler */ - for (int i = 0; i < 8; ++i) { - ss << alphanum[rand() % (sizeof(alphanum) - 1)]; + m_port_state = new_state; +} + + +void +TrexStatelessPort::encode_stats(Json::Value &port) { + + const TrexPlatformApi *api = get_stateless_obj()->get_platform_api(); + + TrexPlatformInterfaceStats stats; + api->get_interface_stats(m_port_id, stats); + + port["tx_bps"] = stats.m_stats.m_tx_bps; + port["rx_bps"] = stats.m_stats.m_rx_bps; + + port["tx_pps"] = stats.m_stats.m_tx_pps; + port["rx_pps"] = stats.m_stats.m_rx_pps; + + port["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts); + port["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts); + + port["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes); + port["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes); + + port["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors); +} + +void +TrexStatelessPort::send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg) { + + for (auto core_id : m_cores_id_list) { + send_message_to_dp(core_id, msg->clone()); } - return (ss.str()); + /* original was not sent - delete it */ + delete msg; +} + +void +TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg) { + + /* send the message to the core */ + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id); + ring->Enqueue((CGenNode *)msg); } /** - * update stats for the port + * when a DP (async) event occurs - handle it * */ void -TrexStatelessPort::update_stats() { - struct rte_eth_stats stats; - rte_eth_stats_get(m_port_id, &stats); +TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) { + Json::Value data; - /* copy straight values */ - m_stats.m_stats.m_total_tx_bytes = stats.obytes; - m_stats.m_stats.m_total_rx_bytes = stats.ibytes; + switch (event_type) { - m_stats.m_stats.m_total_tx_pkts = stats.opackets; - m_stats.m_stats.m_total_rx_pkts = stats.ipackets; + case TrexDpPortEvent::EVENT_STOP: + /* set a stop event */ + change_state(PORT_STATE_STREAMS); + /* send a ZMQ event */ - /* calculate stats */ - m_stats.m_stats.m_tx_bps = m_stats.m_bw_tx_bps.add(stats.obytes); - m_stats.m_stats.m_rx_bps = m_stats.m_bw_rx_bps.add(stats.ibytes); + data["port_id"] = m_port_id; + get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data); + break; - m_stats.m_stats.m_tx_pps = m_stats.m_bw_tx_pps.add(stats.opackets); - m_stats.m_stats.m_rx_pps = m_stats.m_bw_rx_pps.add(stats.ipackets); + default: + assert(0); + } } -const TrexPortStats & -TrexStatelessPort::get_stats() { - return m_stats; +uint64_t +TrexStatelessPort::get_port_speed_bps() const { + switch (m_speed) { + case TrexPlatformApi::SPEED_1G: + return (1LLU * 1000 * 1000 * 1000); + + case TrexPlatformApi::SPEED_10G: + return (10LLU * 1000 * 1000 * 1000); + + case TrexPlatformApi::SPEED_40G: + return (40LLU * 1000 * 1000 * 1000); + + default: + return 0; + } } +double +TrexStatelessPort::calculate_effective_factor(const TrexPortMultiplier &mul) { + + /* for a simple factor request */ + if (mul.m_type == TrexPortMultiplier::MUL_FACTOR) { + return (mul.m_value); + } + + /* we now need the graph - generate it if we don't have it (happens once) */ + if (!m_graph_obj) { + generate_streams_graph(); + } + + switch (mul.m_type) { + case TrexPortMultiplier::MUL_BPS: + return (mul.m_value / m_graph_obj->get_max_bps()); + + case TrexPortMultiplier::MUL_PPS: + return (mul.m_value / m_graph_obj->get_max_pps()); + + case TrexPortMultiplier::MUL_PERCENTAGE: + /* if abs percentage is from the line speed - otherwise its from the current speed */ + + if (mul.m_op == TrexPortMultiplier::OP_ABS) { + double required = (mul.m_value / 100.0) * get_port_speed_bps(); + return (required / m_graph_obj->get_max_bps()); + } else { + return (m_factor * (mul.m_value / 100.0)); + } + + default: + assert(0); + } + +} + + void -TrexStatelessPort::encode_stats(Json::Value &port) { +TrexStatelessPort::generate_streams_graph() { - port["tx_bps"] = m_stats.m_stats.m_tx_bps; - port["rx_bps"] = m_stats.m_stats.m_rx_bps; + /* dispose of the old one */ + if (m_graph_obj) { + delete_streams_graph(); + } - port["tx_pps"] = m_stats.m_stats.m_tx_pps; - port["rx_pps"] = m_stats.m_stats.m_rx_pps; + /* fetch all the streams from the table */ + vector<TrexStream *> streams; + get_object_list(streams); - port["total_tx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_pkts); - port["total_rx_pkts"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_pkts); + TrexStreamsGraph graph; + m_graph_obj = graph.generate(streams); +} - port["total_tx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_tx_bytes); - port["total_rx_bytes"] = Json::Value::UInt64(m_stats.m_stats.m_total_rx_bytes); - - port["tx_rx_errors"] = Json::Value::UInt64(m_stats.m_stats.m_tx_rx_errors); +void +TrexStatelessPort::delete_streams_graph() { + if (m_graph_obj) { + delete m_graph_obj; + m_graph_obj = NULL; + } } /*************************** - * BW measurement + * port multiplier * **************************/ -/* TODO: move this to a common place */ -BWMeasure::BWMeasure() { - reset(); -} +const std::initializer_list<std::string> TrexPortMultiplier::g_types = {"raw", "bps", "pps", "percentage"}; +const std::initializer_list<std::string> TrexPortMultiplier::g_ops = {"abs", "add", "sub"}; + +TrexPortMultiplier:: +TrexPortMultiplier(const std::string &type_str, const std::string &op_str, double value) { + mul_type_e type; + mul_op_e op; + + if (type_str == "raw") { + type = MUL_FACTOR; + + } else if (type_str == "bps") { + type = MUL_BPS; -void BWMeasure::reset(void) { - m_start=false; - m_last_time_msec=0; - m_last_bytes=0; - m_last_result=0.0; -}; + } else if (type_str == "pps") { + type = MUL_PPS; + + } else if (type_str == "percentage") { + type = MUL_PERCENTAGE; + } else { + throw TrexException("bad type str: " + type_str); + } + + if (op_str == "abs") { + op = OP_ABS; + + } else if (op_str == "add") { + op = OP_ADD; + + } else if (op_str == "sub") { + op = OP_SUB; + + } else { + throw TrexException("bad op str: " + op_str); + } + + m_type = type; + m_op = op; + m_value = value; -double BWMeasure::calc_MBsec(uint32_t dtime_msec, - uint64_t dbytes){ - double rate=0.000008*( ( (double)dbytes*(double)os_get_time_freq())/((double)dtime_msec) ); - return(rate); } -double BWMeasure::add(uint64_t size) { - if ( false == m_start ) { - m_start=true; - m_last_time_msec = os_get_time_msec() ; - m_last_bytes=size; - return(0.0); +const TrexStreamsGraphObj * +TrexStatelessPort::validate(void) { + + /* first compile the graph */ + + vector<TrexStream *> streams; + get_object_list(streams); + + if (streams.size() == 0) { + throw TrexException("no streams attached to port"); + } + + TrexStreamsCompiler compiler; + std::vector<TrexStreamsCompiledObj *> compiled_objs; + + std::string fail_msg; + bool rc = compiler.compile(m_port_id, + streams, + compiled_objs, + get_dp_core_count(), + 1.0, + &fail_msg); + if (!rc) { + throw TrexException(fail_msg); + } + + for (auto obj : compiled_objs) { + delete obj; } - uint32_t ctime=os_get_time_msec(); - if ((ctime - m_last_time_msec) <os_get_time_freq() ) { - return(m_last_result); + /* now create a stream graph */ + if (!m_graph_obj) { + generate_streams_graph(); } - uint32_t dtime_msec = ctime-m_last_time_msec; - uint64_t dbytes = size - m_last_bytes; + return m_graph_obj; +} - m_last_time_msec = ctime; - m_last_bytes = size; - m_last_result= 0.5*calc_MBsec(dtime_msec,dbytes) +0.5*(m_last_result); - return( m_last_result ); +/************* Trex Port Owner **************/ + +TrexPortOwner::TrexPortOwner() { + m_is_free = true; + + /* for handlers random generation */ + srand(time(NULL)); } +/** + * generate a random connection handler + * + */ +std::string +TrexPortOwner::generate_handler() { + std::stringstream ss; + + static const char alphanum[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + + /* generate 8 bytes of random handler */ + for (int i = 0; i < 8; ++i) { + ss << alphanum[rand() % (sizeof(alphanum) - 1)]; + } + + return (ss.str()); +} +const std::string TrexPortOwner::g_unowned_name = "<FREE>"; +const std::string TrexPortOwner::g_unowned_handler = ""; diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 428d5aee..4988b46a 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -22,87 +22,103 @@ limitations under the License. #define __TREX_STATELESS_PORT_H__ #include <trex_stream.h> - -/** - * bandwidth measurement class +#include <trex_dp_port_events.h> +#include <internal_api/trex_platform_api.h> + +class TrexStatelessCpToDpMsgBase; +class TrexStreamsGraphObj; +class TrexPortMultiplier; + +/** + * TRex port owner can perform + * write commands + * while port is owned - others can + * do read only commands * */ -class BWMeasure { +class TrexPortOwner { public: - BWMeasure(); - void reset(void); - double add(uint64_t size); -private: - double calc_MBsec(uint32_t dtime_msec, - uint64_t dbytes); + TrexPortOwner(); -public: - bool m_start; - uint32_t m_last_time_msec; - uint64_t m_last_bytes; - double m_last_result; -}; + /** + * is port free to acquire + */ + bool is_free() { + return m_is_free; + } -/** - * TRex stateless port stats - * - * @author imarom (24-Sep-15) - */ -class TrexPortStats { + void release() { + m_is_free = true; + m_owner_name = ""; + m_handler = ""; + } -public: - TrexPortStats() { - m_stats = {0}; + bool is_owned_by(const std::string &user) { + return ( !m_is_free && (m_owner_name == user) ); + } + + void own(const std::string &owner_name) { - m_bw_tx_bps.reset(); - m_bw_rx_bps.reset(); + /* save user data */ + m_owner_name = owner_name; - m_bw_tx_pps.reset(); - m_bw_rx_pps.reset(); + /* internal data */ + m_handler = generate_handler(); + m_is_free = false; } -public: + bool verify(const std::string &handler) { + return ( (!m_is_free) && (m_handler == handler) ); + } - BWMeasure m_bw_tx_bps; - BWMeasure m_bw_rx_bps; - - BWMeasure m_bw_tx_pps; - BWMeasure m_bw_rx_pps; - - struct { - - double m_tx_bps; - double m_rx_bps; - - double m_tx_pps; - double m_rx_pps; - - uint64_t m_total_tx_pkts; - uint64_t m_total_rx_pkts; - - uint64_t m_total_tx_bytes; - uint64_t m_total_rx_bytes; - - uint64_t m_tx_rx_errors; - } m_stats; + const std::string &get_name() { + return (!m_is_free ? m_owner_name : g_unowned_name); + } + + const std::string &get_handler() { + return (!m_is_free ? m_handler : g_unowned_handler); + } + + +private: + std::string generate_handler(); + + /* is this port owned by someone ? */ + bool m_is_free; + + /* user provided info */ + std::string m_owner_name; + + /* handler genereated internally */ + std::string m_handler; + + + /* just references defaults... */ + static const std::string g_unowned_name; + static const std::string g_unowned_handler; }; + /** * describes a stateless port * * @author imarom (31-Aug-15) */ class TrexStatelessPort { + friend class TrexDpPortEvent; + public: /** * port state */ enum port_state_e { - PORT_STATE_DOWN, - PORT_STATE_UP_IDLE, - PORT_STATE_TRANSMITTING + PORT_STATE_DOWN = 0x1, + PORT_STATE_IDLE = 0x2, + PORT_STATE_STREAMS = 0x4, + PORT_STATE_TX = 0x8, + PORT_STATE_PAUSE = 0x10, }; /** @@ -115,31 +131,66 @@ public: RC_ERR_FAILED_TO_COMPILE_STREAMS }; - TrexStatelessPort(uint8_t port_id); + + TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api); + + /** + * acquire port + * throws TrexException in case of an error + */ + void acquire(const std::string &user, bool force = false); + + /** + * release the port from the current user + * throws TrexException in case of an error + */ + void release(void); + + /** + * validate the state of the port before start + * it will return a stream graph + * containing information about the streams + * configured on this port + * + * on error it throws TrexException + */ + const TrexStreamsGraphObj *validate(void); /** * start traffic - * + * throws TrexException in case of an error */ - rc_e start_traffic(void); + void start_traffic(const TrexPortMultiplier &mul, double duration); /** * stop traffic - * + * throws TrexException in case of an error */ void stop_traffic(void); /** - * access the stream table + * pause traffic + * throws TrexException in case of an error + */ + void pause_traffic(void); + + /** + * resume traffic + * throws TrexException in case of an error + */ + void resume_traffic(void); + + /** + * update current traffic on port * */ - TrexStreamTable *get_stream_table(); + void update_traffic(const TrexPortMultiplier &mul); /** * get the port state * */ - port_state_e get_state() { + port_state_e get_state() const { return m_port_state; } @@ -147,7 +198,7 @@ public: * port state as string * */ - std::string get_state_as_string(); + std::string get_state_as_string() const; /** * fill up properties of the port @@ -157,74 +208,219 @@ public: * @param driver * @param speed */ - void get_properties(std::string &driver, std::string &speed); + void get_properties(std::string &driver, TrexPlatformApi::driver_speed_e &speed); + + /** - * query for ownership - * - */ - const std::string &get_owner() { - return m_owner; + * encode stats as JSON + */ + void encode_stats(Json::Value &port); + + uint8_t get_port_id() { + return m_port_id; } /** - * owner handler - * for the connection + * delegators * */ - const std::string &get_owner_handler() { - return m_owner_handler; + + void add_stream(TrexStream *stream) { + verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS); + + m_stream_table.add_stream(stream); + delete_streams_graph(); + + change_state(PORT_STATE_STREAMS); } - bool is_free_to_aquire() { - return (m_owner == "none"); + void remove_stream(TrexStream *stream) { + verify_state(PORT_STATE_STREAMS); + + m_stream_table.remove_stream(stream); + delete_streams_graph(); + + if (m_stream_table.size() == 0) { + change_state(PORT_STATE_IDLE); + } } - /** - * take ownership of the server array - * this is static - * ownership is total - * - */ - void set_owner(const std::string &owner) { - m_owner = owner; - m_owner_handler = generate_handler(); + void remove_and_delete_all_streams() { + verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS); + + m_stream_table.remove_and_delete_all_streams(); + delete_streams_graph(); + + change_state(PORT_STATE_IDLE); } - void clear_owner() { - m_owner = "none"; - m_owner_handler = ""; + TrexStream * get_stream_by_id(uint32_t stream_id) { + return m_stream_table.get_stream_by_id(stream_id); } - bool verify_owner_handler(const std::string &handler) { + void get_id_list(std::vector<uint32_t> &id_list) { + m_stream_table.get_id_list(id_list); + } - return ( (m_owner != "none") && (m_owner_handler == handler) ); + void get_object_list(std::vector<TrexStream *> &object_list) { + m_stream_table.get_object_list(object_list); + } + TrexDpPortEvents & get_dp_events() { + return m_dp_events; } + /** - * update the values of the stats + * returns the number of DP cores linked to this port * */ - void update_stats(); + uint8_t get_dp_core_count() { + return m_cores_id_list.size(); + } - const TrexPortStats & get_stats(); + /** + * returns the traffic multiplier currently being used by the DP + * + */ + double get_multiplier() { + return (m_factor); + } /** - * encode stats as JSON + * get port speed in bits per second + * */ - void encode_stats(Json::Value &port); + uint64_t get_port_speed_bps() const; + + TrexPortOwner & get_owner() { + return m_owner; + } private: + + const std::vector<int> get_core_id_list () { + return m_cores_id_list; + } + + bool verify_state(int state, bool should_throw = true) const; + + void change_state(port_state_e new_state); + std::string generate_handler(); - TrexStreamTable m_stream_table; - uint8_t m_port_id; - port_state_e m_port_state; - std::string m_owner; - std::string m_owner_handler; - TrexPortStats m_stats; + /** + * send message to all cores using duplicate + * + */ + void send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg); + + /** + * send message to specific DP core + * + */ + void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg); + + /** + * triggered when event occurs + * + */ + void on_dp_event_occured(TrexDpPortEvent::event_e event_type); + + + /** + * calculate effective M per core + * + */ + double calculate_effective_factor(const TrexPortMultiplier &mul); + + + + /** + * generates a graph of streams graph + * + */ + void generate_streams_graph(); + + /** + * dispose of it + * + * @author imarom (26-Nov-15) + */ + void delete_streams_graph(); + + + TrexStreamTable m_stream_table; + uint8_t m_port_id; + port_state_e m_port_state; + std::string m_driver_name; + + TrexPlatformApi::driver_speed_e m_speed; + + /* holds the DP cores associated with this port */ + std::vector<int> m_cores_id_list; + + bool m_last_all_streams_continues; + double m_last_duration; + double m_factor; + + TrexDpPortEvents m_dp_events; + + /* holds a graph of streams rate*/ + const TrexStreamsGraphObj *m_graph_obj; + + /* owner information */ + TrexPortOwner m_owner; +}; + + +/** + * port multiplier object + * + */ +class TrexPortMultiplier { +public: + + + /** + * defines the type of multipler passed to start + */ + enum mul_type_e { + MUL_FACTOR, + MUL_BPS, + MUL_PPS, + MUL_PERCENTAGE + }; + + /** + * multiplier can be absolute value + * increment value or subtract value + */ + enum mul_op_e { + OP_ABS, + OP_ADD, + OP_SUB + }; + + + TrexPortMultiplier(mul_type_e type, mul_op_e op, double value) { + m_type = type; + m_op = op; + m_value = value; + } + + TrexPortMultiplier(const std::string &type_str, const std::string &op_str, double value); + + +public: + static const std::initializer_list<std::string> g_types; + static const std::initializer_list<std::string> g_ops; + + mul_type_e m_type; + mul_op_e m_op; + double m_value; }; #endif /* __TREX_STATELESS_PORT_H__ */ diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp index 182036f1..cad603e2 100644 --- a/src/stateless/cp/trex_stream.cpp +++ b/src/stateless/cp/trex_stream.cpp @@ -20,13 +20,82 @@ limitations under the License. */ #include <trex_stream.h> #include <cstddef> +#include <string.h> +#include <assert.h> /************************************** * stream *************************************/ -TrexStream::TrexStream(uint8_t port_id, uint32_t stream_id) : m_port_id(port_id), m_stream_id(stream_id) { + + +std::string TrexStream::get_stream_type_str(stream_type_t stream_type){ + + std::string res; + + + switch (stream_type) { + + case stCONTINUOUS : + res="stCONTINUOUS "; + break; + + case stSINGLE_BURST : + res="stSINGLE_BURST "; + break; + + case stMULTI_BURST : + res="stMULTI_BURST "; + break; + default: + res="Unknow "; + }; + return(res); +} + + +void TrexStream::Dump(FILE *fd){ + + fprintf(fd,"\n"); + fprintf(fd,"==> Stream_id : %lu \n",(ulong)m_stream_id); + fprintf(fd," Enabled : %lu \n",(ulong)(m_enabled?1:0)); + fprintf(fd," Self_start : %lu \n",(ulong)(m_self_start?1:0)); + + if (m_next_stream_id>=0) { + fprintf(fd," Nex_stream_id : %lu \n",(ulong)m_next_stream_id); + }else { + fprintf(fd," Nex_stream_id : %d \n",m_next_stream_id); + } + + fprintf(fd," Port_id : %lu \n",(ulong)m_port_id); + + if (m_isg_usec>0.0) { + fprintf(fd," isg : %6.2f \n",m_isg_usec); + } + fprintf(fd," type : %s \n",get_stream_type_str(m_type).c_str()); + + if ( m_type == TrexStream::stCONTINUOUS ) { + fprintf(fd," pps : %f \n",m_pps); + } + if (m_type == TrexStream::stSINGLE_BURST) { + fprintf(fd," pps : %f \n",m_pps); + fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts); + } + if (m_type == TrexStream::stMULTI_BURST) { + fprintf(fd," pps : %f \n",m_pps); + fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts); + fprintf(fd," mburst : %lu \n",(ulong)m_num_bursts); + if (m_ibg_usec>0.0) { + fprintf(fd," m_ibg_usec : %f \n",m_ibg_usec); + } + } +} + + +TrexStream::TrexStream(uint8_t type, + uint8_t port_id, uint32_t stream_id) : m_port_id(port_id), m_stream_id(stream_id) { /* default values */ + m_type = type; m_isg_usec = 0; m_next_stream_id = -1; m_enabled = false; @@ -37,6 +106,11 @@ TrexStream::TrexStream(uint8_t port_id, uint32_t stream_id) : m_port_id(port_id) m_rx_check.m_enable = false; + + m_pps=-1.0; + m_burst_total_pkts=0; + m_num_bursts=1; + m_ibg_usec=0.0; } TrexStream::~TrexStream() { @@ -56,6 +130,7 @@ TrexStream::get_stream_json() { return m_stream_json; } + /************************************** * stream table *************************************/ @@ -103,14 +178,24 @@ TrexStream * TrexStreamTable::get_stream_by_id(uint32_t stream_id) { } } -void TrexStreamTable::get_stream_list(std::vector<uint32_t> &stream_list) { - stream_list.clear(); +void TrexStreamTable::get_id_list(std::vector<uint32_t> &id_list) { + id_list.clear(); + + for (auto stream : m_stream_table) { + id_list.push_back(stream.first); + } +} + +void TrexStreamTable::get_object_list(std::vector<TrexStream *> &object_list) { + object_list.clear(); for (auto stream : m_stream_table) { - stream_list.push_back(stream.first); + object_list.push_back(stream.second); } + } int TrexStreamTable::size() { return m_stream_table.size(); } + diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h index f5bc96ef..b991b05f 100644 --- a/src/stateless/cp/trex_stream.h +++ b/src/stateless/cp/trex_stream.h @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -29,22 +30,49 @@ limitations under the License. #include <json/json.h> #include <trex_stream_vm.h> +#include <stdio.h> +#include <string.h> class TrexRpcCmdAddStream; + +struct CStreamPktData { + uint8_t *binary; + uint16_t len; + + std::string meta; + +public: + inline void clone(uint8_t * in_binary, + uint32_t in_pkt_size){ + binary = new uint8_t[in_pkt_size]; + len = in_pkt_size; + memcpy(binary,in_binary,in_pkt_size); + } +}; + + /** * Stateless Stream * */ class TrexStream { - /* provide the RPC parser a way to access private fields */ - friend class TrexRpcCmdAddStream; - friend class TrexRpcCmdGetStream; - friend class TrexStreamTable; public: - TrexStream(uint8_t port_id, uint32_t stream_id); - virtual ~TrexStream() = 0; + enum STREAM_TYPE { + stNONE = 0, + stCONTINUOUS = 4, + stSINGLE_BURST = 5, + stMULTI_BURST = 6 + }; + + typedef uint8_t stream_type_t ; + + static std::string get_stream_type_str(stream_type_t stream_type); + +public: + TrexStream(uint8_t type,uint8_t port_id, uint32_t stream_id); + virtual ~TrexStream(); /* defines the min max per packet supported */ static const uint32_t MIN_PKT_SIZE_BYTES = 1; @@ -56,10 +84,89 @@ public: /* access the stream json */ const Json::Value & get_stream_json(); -protected: + /* compress the stream id to be zero based */ + void fix_dp_stream_id(uint32_t my_stream_id,int next_stream_id){ + m_stream_id = my_stream_id; + m_next_stream_id = next_stream_id; + } + + double get_pps() const { + return m_pps; + } + + void set_pps(double pps){ + m_pps = pps; + } + + void set_type(uint8_t type){ + m_type = type; + } + + uint8_t get_type(void) const { + return ( m_type ); + } + + bool is_dp_next_stream(){ + if (m_next_stream_id<0) { + return (false); + }else{ + return (true); + } + } + + + + void set_multi_burst(uint32_t burst_total_pkts, + uint32_t num_bursts, + double ibg_usec) { + m_burst_total_pkts = burst_total_pkts; + m_num_bursts = num_bursts; + m_ibg_usec = ibg_usec; + } + + void set_single_burst(uint32_t burst_total_pkts){ + set_multi_burst(burst_total_pkts,1,0.0); + } + + /* create new stream */ + TrexStream * clone_as_dp() const { + + TrexStream *dp = new TrexStream(m_type,m_port_id,m_stream_id); + + + dp->m_isg_usec = m_isg_usec; + dp->m_next_stream_id = m_next_stream_id; + + dp->m_enabled = m_enabled; + dp->m_self_start = m_self_start; + + /* deep copy */ + dp->m_pkt.clone(m_pkt.binary,m_pkt.len); + + dp->m_rx_check = m_rx_check; + dp->m_pps = m_pps; + dp->m_burst_total_pkts = m_burst_total_pkts; + dp->m_num_bursts = m_num_bursts; + dp->m_ibg_usec = m_ibg_usec ; + return (dp); + } + + + double get_burst_length_usec() const { + return ( (m_burst_total_pkts / m_pps) * 1000 * 1000); + } + + double get_bps() const { + /* packet length + 4 CRC bytes to bits and multiplied by PPS */ + return (m_pps * (m_pkt.len + 4) * 8); + } + + void Dump(FILE *fd); +public: /* basic */ + uint8_t m_type; uint8_t m_port_id; - uint32_t m_stream_id; + uint32_t m_stream_id; /* id from RPC can be anything */ /* config fields */ @@ -69,13 +176,9 @@ protected: /* indicators */ bool m_enabled; bool m_self_start; - + + CStreamPktData m_pkt; /* pkt */ - struct { - uint8_t *binary; - uint16_t len; - std::string meta; - } m_pkt; /* VM */ StreamVm m_vm; @@ -89,64 +192,19 @@ protected: } m_rx_check; + double m_pps; - /* original template provided by requester */ - Json::Value m_stream_json; -}; - -/** - * continuous stream - * - */ -class TrexStreamContinuous : public TrexStream { -public: - TrexStreamContinuous(uint8_t port_id, uint32_t stream_id, double pps) : TrexStream(port_id, stream_id), m_pps(pps) { - } + uint32_t m_burst_total_pkts; /* valid in case of burst stSINGLE_BURST,stMULTI_BURST*/ - double get_pps() { - return m_pps; - } + uint32_t m_num_bursts; /* valid in case of stMULTI_BURST */ -protected: - double m_pps; -}; + double m_ibg_usec; /* valid in case of stMULTI_BURST */ -/** - * single burst - * - */ -class TrexStreamBurst : public TrexStream { -public: - TrexStreamBurst(uint8_t port_id, uint32_t stream_id, uint32_t total_pkts, double pps) : - TrexStream(port_id, stream_id), - m_total_pkts(total_pkts), - m_pps(pps) { - } + /* original template provided by requester */ + Json::Value m_stream_json; -protected: - uint32_t m_total_pkts; - double m_pps; }; -/** - * multi burst - * - */ -class TrexStreamMultiBurst : public TrexStreamBurst { -public: - TrexStreamMultiBurst(uint8_t port_id, - uint32_t stream_id, - uint32_t pkts_per_burst, - double pps, - uint32_t num_bursts, - double ibg_usec) : TrexStreamBurst(port_id, stream_id, pkts_per_burst, pps), m_num_bursts(num_bursts), m_ibg_usec(ibg_usec) { - - } -protected: - uint32_t m_num_bursts; - double m_ibg_usec; - -}; /** * holds all the streams @@ -189,7 +247,13 @@ public: * * @param stream_list */ - void get_stream_list(std::vector<uint32_t> &stream_list); + void get_id_list(std::vector<uint32_t> &id_list); + + /** + * populate a list with all the stream objects + * + */ + void get_object_list(std::vector<TrexStream *> &object_list); /** * get the table size @@ -197,6 +261,9 @@ public: */ int size(); + std::unordered_map<int, TrexStream *>::iterator begin() {return m_stream_table.begin();} + std::unordered_map<int, TrexStream *>::iterator end() {return m_stream_table.end();} + private: /** * holds all the stream in a hash table by stream id diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp new file mode 100644 index 00000000..478e09f8 --- /dev/null +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -0,0 +1,746 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include <string> +#include <sstream> +#include <trex_streams_compiler.h> +#include <trex_stream.h> +#include <assert.h> +#include <trex_stateless.h> +#include <iostream> + +/** + * describes a graph node in the pre compile check + * + * @author imarom (16-Nov-15) + */ +class GraphNode { +public: + GraphNode(const TrexStream *stream, GraphNode *next) : m_stream(stream), m_next(next) { + m_marked = false; + m_compressed_stream_id=-1; + + } + + uint32_t get_stream_id() const { + return m_stream->m_stream_id; + } + + uint32_t get_next_stream_id() const { + return m_stream->m_next_stream_id; + + } + + const TrexStream *m_stream; + GraphNode *m_next; + std::vector<const GraphNode *> m_parents; + bool m_marked; + int m_compressed_stream_id; +}; + +/** + * node map + * + */ +class GraphNodeMap { +public: + + GraphNodeMap() : m_dead_end(NULL, NULL) { + + } + + bool add(GraphNode *node) { + if (has(node->get_stream_id())) { + return false; + } + + m_nodes[node->get_stream_id()] = node; + + if (node->m_stream->m_self_start) { + m_roots.push_back(node); + } + + return true; + } + + bool has(uint32_t stream_id) { + + return (get(stream_id) != NULL); + } + + GraphNode * get(uint32_t stream_id) { + + if (stream_id == -1) { + return &m_dead_end; + } + + auto search = m_nodes.find(stream_id); + + if (search != m_nodes.end()) { + return search->second; + } else { + return NULL; + } + } + + void clear_marks() { + for (auto node : m_nodes) { + node.second->m_marked = false; + } + } + + void get_unmarked(std::vector <GraphNode *> &unmarked) { + for (auto node : m_nodes) { + if (!node.second->m_marked) { + unmarked.push_back(node.second); + } + } + } + + + ~GraphNodeMap() { + for (auto node : m_nodes) { + delete node.second; + } + m_nodes.clear(); + } + + std::vector <GraphNode *> & get_roots() { + return m_roots; + } + + + std::unordered_map<uint32_t, GraphNode *> get_nodes() { + return m_nodes; + } + +private: + std::unordered_map<uint32_t, GraphNode *> m_nodes; + std::vector <GraphNode *> m_roots; + GraphNode m_dead_end; +}; + + +/************************************** + * stream compiled object + *************************************/ +TrexStreamsCompiledObj::TrexStreamsCompiledObj(uint8_t port_id) { + m_port_id = port_id; + m_all_continues = false; +} + +TrexStreamsCompiledObj::~TrexStreamsCompiledObj() { + for (auto obj : m_objs) { + delete obj.m_stream; + } + m_objs.clear(); +} + + +void +TrexStreamsCompiledObj::add_compiled_stream(TrexStream *stream){ + + obj_st obj; + + obj.m_stream = stream; + + m_objs.push_back(obj); +} + + +TrexStreamsCompiledObj * +TrexStreamsCompiledObj::clone() { + + TrexStreamsCompiledObj *new_compiled_obj = new TrexStreamsCompiledObj(m_port_id); + + /** + * clone each element + */ + for (auto obj : m_objs) { + TrexStream *new_stream = obj.m_stream->clone_as_dp(); + new_compiled_obj->add_compiled_stream(new_stream); + } + + return new_compiled_obj; + +} + +void TrexStreamsCompiledObj::Dump(FILE *fd){ + for (auto obj : m_objs) { + obj.m_stream->Dump(fd); + } +} + + +void +TrexStreamsCompiler::add_warning(const std::string &warning) { + m_warnings.push_back("*** warning: " + warning); +} + +void +TrexStreamsCompiler::err(const std::string &err) { + throw TrexException("*** error: " + err); +} + +void +TrexStreamsCompiler::check_stream(const TrexStream *stream) { + std::stringstream ss; + + /* cont. stream can point only on itself */ + if (stream->get_type() == TrexStream::stCONTINUOUS) { + if (stream->m_next_stream_id != -1) { + ss << "continous stream '" << stream->m_stream_id << "' cannot point to another stream"; + err(ss.str()); + } + } +} + +void +TrexStreamsCompiler::allocate_pass(const std::vector<TrexStream *> &streams, + GraphNodeMap *nodes) { + std::stringstream ss; + uint32_t compressed_stream_id=0; + + + /* first pass - allocate all nodes and check for duplicates */ + for (auto stream : streams) { + + /* skip non enabled streams */ + if (!stream->m_enabled) { + continue; + } + + /* sanity check on the stream itself */ + check_stream(stream); + + /* duplicate stream id ? */ + if (nodes->has(stream->m_stream_id)) { + ss << "duplicate instance of stream id " << stream->m_stream_id; + err(ss.str()); + } + + GraphNode *node = new GraphNode(stream, NULL); + /* allocate new compressed id */ + node->m_compressed_stream_id = compressed_stream_id; + + compressed_stream_id++; + + /* add to the map */ + assert(nodes->add(node)); + } + +} + +/** + * on this pass we direct the graph to point to the right nodes + * + */ +void +TrexStreamsCompiler::direct_pass(GraphNodeMap *nodes) { + + /* second pass - direct the graph */ + for (auto p : nodes->get_nodes()) { + + GraphNode *node = p.second; + const TrexStream *stream = node->m_stream; + + /* check the stream points on an existing stream */ + GraphNode *next_node = nodes->get(stream->m_next_stream_id); + if (!next_node) { + std::stringstream ss; + ss << "stream " << node->get_stream_id() << " is pointing on non existent stream " << stream->m_next_stream_id; + err(ss.str()); + } + + node->m_next = next_node; + + /* do we have more than one parent ? */ + next_node->m_parents.push_back(node); + } + + + /* check for multiple parents */ + for (auto p : nodes->get_nodes()) { + GraphNode *node = p.second; + + if (node->m_parents.size() > 0 ) { + std::stringstream ss; + + ss << "stream " << node->get_stream_id() << " is triggered by multiple streams: "; + for (auto x : node->m_parents) { + ss << x->get_stream_id() << " "; + } + + add_warning(ss.str()); + } + } +} + +/** + * mark sure all the streams are reachable + * + */ +void +TrexStreamsCompiler::check_for_unreachable_streams(GraphNodeMap *nodes) { + /* start with the roots */ + std::vector <GraphNode *> next_nodes = nodes->get_roots(); + + + nodes->clear_marks(); + + /* run BFS from all the roots */ + while (!next_nodes.empty()) { + + /* pull one */ + GraphNode *node = next_nodes.back(); + next_nodes.pop_back(); + if (node->m_marked) { + continue; + } + + node->m_marked = true; + + if (node->m_next != NULL) { + next_nodes.push_back(node->m_next); + } + + } + + std::vector <GraphNode *> unmarked; + nodes->get_unmarked(unmarked); + + if (!unmarked.empty()) { + std::stringstream ss; + for (auto node : unmarked) { + ss << "stream " << node->get_stream_id() << " is unreachable from any other stream\n"; + } + err(ss.str()); + } + + +} + +/** + * check validation of streams for compile + * + * @author imarom (16-Nov-15) + * + * @param streams + * @param fail_msg + * + * @return bool + */ +void +TrexStreamsCompiler::pre_compile_check(const std::vector<TrexStream *> &streams, + GraphNodeMap & nodes) { + + m_warnings.clear(); + + /* allocate nodes */ + allocate_pass(streams, &nodes); + + /* direct the graph */ + direct_pass(&nodes); + + /* check for non reachable streams inside the graph */ + check_for_unreachable_streams(&nodes); + +} + +/************************************** + * stream compiler + *************************************/ +bool +TrexStreamsCompiler::compile(uint8_t port_id, + const std::vector<TrexStream *> &streams, + std::vector<TrexStreamsCompiledObj *> &objs, + uint8_t dp_core_count, + double factor, + std::string *fail_msg) { + +#if 0 + for (auto stream : streams) { + stream->Dump(stdout); + } + fprintf(stdout,"------------pre compile \n"); +#endif + + GraphNodeMap nodes; + + + /* compile checks */ + try { + pre_compile_check(streams, nodes); + } catch (const TrexException &ex) { + if (fail_msg) { + *fail_msg = ex.what(); + } else { + std::cout << ex.what(); + } + return false; + } + + /* check if all are cont. streams */ + bool all_continues = true; + for (const auto stream : streams) { + if (stream->get_type() != TrexStream::stCONTINUOUS) { + all_continues = false; + break; + } + } + + /* allocate objects for all DP cores */ + for (uint8_t i = 0; i < dp_core_count; i++) { + TrexStreamsCompiledObj *obj = new TrexStreamsCompiledObj(port_id); + obj->m_all_continues = all_continues; + objs.push_back(obj); + } + + /* compile all the streams */ + for (auto stream : streams) { + + /* skip non-enabled streams */ + if (!stream->m_enabled) { + continue; + } + + /* compile a single stream to all cores */ + compile_stream(stream, factor, dp_core_count, objs, nodes); + + } + + return true; +} + +/** + * compiles a single stream to DP objects + * + * @author imarom (03-Dec-15) + * + */ +void +TrexStreamsCompiler::compile_stream(const TrexStream *stream, + double factor, + uint8_t dp_core_count, + std::vector<TrexStreamsCompiledObj *> &objs, + GraphNodeMap &nodes) { + + + /* fix the stream ids */ + int new_id = nodes.get(stream->m_stream_id)->m_compressed_stream_id; + assert(new_id >= 0); + + int new_next_id = -1; + if (stream->m_next_stream_id >= 0) { + new_next_id = nodes.get(stream->m_next_stream_id)->m_compressed_stream_id; + } + + /* calculate rate */ + double per_core_rate = (stream->m_pps * (factor / dp_core_count)); + int per_core_burst_total_pkts = (stream->m_burst_total_pkts / dp_core_count); + + std::vector<TrexStream *> per_core_streams(dp_core_count); + + /* for each core - creates its own version of the stream */ + for (uint8_t i = 0; i < dp_core_count; i++) { + TrexStream *dp_stream = stream->clone_as_dp(); + + /* fix stream ID */ + dp_stream->fix_dp_stream_id(new_id, new_next_id); + + + /* adjust rate and packets count */ + dp_stream->m_pps = per_core_rate; + dp_stream->m_burst_total_pkts = per_core_burst_total_pkts; + + per_core_streams[i] = dp_stream; + } + + /* take care of remainder from a burst */ + int burst_remainder = stream->m_burst_total_pkts - (per_core_burst_total_pkts * dp_core_count); + per_core_streams[0]->m_burst_total_pkts += burst_remainder; + + /* attach the compiled stream of every core to its object */ + for (uint8_t i = 0; i < dp_core_count; i++) { + objs[i]->add_compiled_stream(per_core_streams[i]); + } + + +} + +/************************************** + * streams graph + *************************************/ + +/** + * for each stream we create the right rate events (up/down) + * + * @author imarom (24-Nov-15) + * + * @param offset_usec + * @param stream + */ +void +TrexStreamsGraph::add_rate_events_for_stream(double &offset_usec, const TrexStream *stream) { + + switch (stream->get_type()) { + + case TrexStream::stCONTINUOUS: + add_rate_events_for_stream_cont(offset_usec, stream); + return; + + case TrexStream::stSINGLE_BURST: + add_rate_events_for_stream_single_burst(offset_usec, stream); + return; + + case TrexStream::stMULTI_BURST: + add_rate_events_for_stream_multi_burst(offset_usec, stream); + return; + } +} + +/** + * continous stream + * + */ +void +TrexStreamsGraph::add_rate_events_for_stream_cont(double &offset_usec, const TrexStream *stream) { + + TrexStreamsGraphObj::rate_event_st start_event; + + /* for debug purposes */ + start_event.stream_id = stream->m_stream_id; + + start_event.time = offset_usec + stream->m_isg_usec; + start_event.diff_pps = stream->get_pps(); + start_event.diff_bps = stream->get_bps(); + m_graph_obj->add_rate_event(start_event); + + /* no more events after this stream */ + offset_usec = -1; + + /* also mark we have an inifite time */ + m_graph_obj->m_expected_duration = -1; +} + +/** + * single burst stream + * + */ +void +TrexStreamsGraph::add_rate_events_for_stream_single_burst(double &offset_usec, const TrexStream *stream) { + TrexStreamsGraphObj::rate_event_st start_event; + TrexStreamsGraphObj::rate_event_st stop_event; + + + /* for debug purposes */ + start_event.stream_id = stream->m_stream_id; + stop_event.stream_id = stream->m_stream_id; + + /* start event */ + start_event.time = offset_usec + stream->m_isg_usec; + start_event.diff_pps = stream->get_pps(); + start_event.diff_bps = stream->get_bps(); + m_graph_obj->add_rate_event(start_event); + + /* stop event */ + stop_event.time = start_event.time + stream->get_burst_length_usec(); + stop_event.diff_pps = -(start_event.diff_pps); + stop_event.diff_bps = -(start_event.diff_bps); + m_graph_obj->add_rate_event(stop_event); + + /* next stream starts from here */ + offset_usec = stop_event.time; + +} + +/** + * multi burst stream + * + */ +void +TrexStreamsGraph::add_rate_events_for_stream_multi_burst(double &offset_usec, const TrexStream *stream) { + TrexStreamsGraphObj::rate_event_st start_event; + TrexStreamsGraphObj::rate_event_st stop_event; + + /* first the delay is the inter stream gap */ + double delay = stream->m_isg_usec; + + /* for debug purposes */ + + start_event.diff_pps = stream->get_pps(); + start_event.diff_bps = stream->get_bps(); + start_event.stream_id = stream->m_stream_id; + + stop_event.diff_pps = -(start_event.diff_pps); + stop_event.diff_bps = -(start_event.diff_bps); + stop_event.stream_id = stream->m_stream_id; + + /* for each burst create up/down events */ + for (int i = 0; i < stream->m_num_bursts; i++) { + + start_event.time = offset_usec + delay; + m_graph_obj->add_rate_event(start_event); + + stop_event.time = start_event.time + stream->get_burst_length_usec(); + m_graph_obj->add_rate_event(stop_event); + + /* after the first burst, the delay is inter burst gap */ + delay = stream->m_ibg_usec; + + offset_usec = stop_event.time; + } +} + +/** + * for a single root we can until done or a loop detected + * + * @author imarom (24-Nov-15) + * + * @param root_stream_id + */ +void +TrexStreamsGraph::generate_graph_for_one_root(uint32_t root_stream_id) { + + std::unordered_map<uint32_t, bool> loop_hash; + std::stringstream ss; + + uint32_t stream_id = root_stream_id; + double offset = 0; + + while (true) { + const TrexStream *stream; + + /* fetch the stream from the hash - if it is not present, report an error */ + try { + stream = m_streams_hash.at(stream_id); + } catch (const std::out_of_range &e) { + ss << "stream id " << stream_id << " does not exists"; + throw TrexException(ss.str()); + } + + /* add the node to the hash for loop detection */ + loop_hash[stream_id] = true; + + /* create the right rate events for the stream */ + add_rate_events_for_stream(offset, stream); + + /* do we have a next stream ? */ + if (stream->m_next_stream_id == -1) { + break; + } + + /* loop detection */ + auto search = loop_hash.find(stream->m_next_stream_id); + if (search != loop_hash.end()) { + m_graph_obj->on_loop_detection(); + break; + } + + /* handle the next one */ + stream_id = stream->m_next_stream_id; + } +} + +/** + * for a vector of streams generate a graph of BW + * see graph object for more details + * + */ +const TrexStreamsGraphObj * +TrexStreamsGraph::generate(const std::vector<TrexStream *> &streams) { + + /* main object to hold the graph - returned to the user */ + m_graph_obj = new TrexStreamsGraphObj(); + + std::vector <uint32_t> root_streams; + + /* before anything we create a hash streams ID + and grab the root nodes + */ + for (TrexStream *stream : streams) { + + /* skip non enabled streams */ + if (!stream->m_enabled) { + continue; + } + + /* for fast search we populate all the streams in a hash */ + m_streams_hash[stream->m_stream_id] = stream; + + /* hold all the self start nodes in a vector */ + if (stream->m_self_start) { + root_streams.push_back(stream->m_stream_id); + } + } + + /* for each node - scan until done or loop */ + for (uint32_t root_id : root_streams) { + generate_graph_for_one_root(root_id); + } + + + m_graph_obj->generate(); + + return m_graph_obj; +} + +/************************************** + * streams graph object + *************************************/ +void +TrexStreamsGraphObj::find_max_rate() { + double max_rate_pps = 0; + double current_rate_pps = 0; + + double max_rate_bps = 0; + double current_rate_bps = 0; + + /* now we simply walk the list and hold the max */ + for (auto &ev : m_rate_events) { + + current_rate_pps += ev.diff_pps; + current_rate_bps += ev.diff_bps; + + max_rate_pps = std::max(max_rate_pps, current_rate_pps); + max_rate_bps = std::max(max_rate_bps, current_rate_bps); + } + + /* if not mark as inifite - get the last event time */ + if (m_expected_duration != -1) { + m_expected_duration = m_rate_events.back().time; + } + + m_max_pps = max_rate_pps; + m_max_bps = max_rate_bps; +} + +static +bool event_compare (const TrexStreamsGraphObj::rate_event_st &first, const TrexStreamsGraphObj::rate_event_st &second) { + return (first.time < second.time); +} + +void +TrexStreamsGraphObj::generate() { + m_rate_events.sort(event_compare); + find_max_rate(); +} + diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h new file mode 100644 index 00000000..7fe2dbf2 --- /dev/null +++ b/src/stateless/cp/trex_streams_compiler.h @@ -0,0 +1,229 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STREAMS_COMPILER_H__ +#define __TREX_STREAMS_COMPILER_H__ + +#include <stdint.h> +#include <vector> +#include <list> +#include <string> +#include <unordered_map> + +class TrexStreamsCompiler; +class TrexStream; +class GraphNodeMap; + +/** + * compiled object for a table of streams + * + * @author imarom (28-Oct-15) + */ +class TrexStreamsCompiledObj { + friend class TrexStreamsCompiler; + +public: + + TrexStreamsCompiledObj(uint8_t port_id); + ~TrexStreamsCompiledObj(); + + struct obj_st { + + TrexStream * m_stream; + }; + + const std::vector<obj_st> & get_objects() { + return m_objs; + } + + uint8_t get_port_id(){ + return (m_port_id); + } + + bool get_all_streams_continues(){ + return (m_all_continues); + } + + void Dump(FILE *fd); + + TrexStreamsCompiledObj* clone(); + +private: + void add_compiled_stream(TrexStream *stream); + + + std::vector<obj_st> m_objs; + + bool m_all_continues; + uint8_t m_port_id; +}; + +class TrexStreamsCompiler { +public: + + /** + * compiles a vector of streams to an object passable to the DP + * + * @author imarom (28-Oct-15) + * + */ + bool compile(uint8_t port_id, + const std::vector<TrexStream *> &streams, + std::vector<TrexStreamsCompiledObj *> &objs, + uint8_t dp_core_count = 1, + double factor = 1.0, + std::string *fail_msg = NULL); + + + /** + * + * returns a reference pointer to the last compile warnings + * if no warnings were produced - the vector is empty + */ + const std::vector<std::string> & get_last_compile_warnings() { + return m_warnings; + } + +private: + + void pre_compile_check(const std::vector<TrexStream *> &streams, + GraphNodeMap & nodes); + void allocate_pass(const std::vector<TrexStream *> &streams, GraphNodeMap *nodes); + void direct_pass(GraphNodeMap *nodes); + void check_for_unreachable_streams(GraphNodeMap *nodes); + void check_stream(const TrexStream *stream); + void add_warning(const std::string &warning); + void err(const std::string &err); + + void compile_stream(const TrexStream *stream, + double factor, + uint8_t dp_core_count, + std::vector<TrexStreamsCompiledObj *> &objs, + GraphNodeMap &nodes); + + std::vector<std::string> m_warnings; +}; + +class TrexStreamsGraph; + +/************************************** + * streams graph object + * + * holds the step graph for bandwidth + *************************************/ +class TrexStreamsGraphObj { + friend class TrexStreamsGraph; + +public: + + TrexStreamsGraphObj() { + m_max_pps = 0; + m_max_bps = 0; + m_expected_duration = 0; + } + + /** + * rate event is defined by those: + * time - the time of the event on the timeline + * diff - what is the nature of the change ? + * + * @author imarom (23-Nov-15) + */ + struct rate_event_st { + double time; + double diff_pps; + double diff_bps; + uint32_t stream_id; + }; + + double get_max_pps() const { + return m_max_pps; + } + + double get_max_bps() const { + return m_max_bps; + } + + int get_duration() const { + return m_expected_duration; + } + + const std::list<rate_event_st> & get_events() const { + return m_rate_events; + } + + +private: + + void on_loop_detection() { + m_expected_duration = -1; + } + + void add_rate_event(const rate_event_st &ev) { + m_rate_events.push_back(ev); + } + + void generate(); + void find_max_rate(); + + double m_max_pps; + double m_max_bps; + int m_expected_duration; + + /* list of rate events */ + std::list<rate_event_st> m_rate_events; + +}; + +/** + * graph creator + * + * @author imarom (23-Nov-15) + */ +class TrexStreamsGraph { +public: + + TrexStreamsGraph() { + m_graph_obj = NULL; + } + + /** + * generate a sequence graph for streams + * + */ + const TrexStreamsGraphObj * generate(const std::vector<TrexStream *> &streams); + +private: + + void generate_graph_for_one_root(uint32_t root_stream_id); + + void add_rate_events_for_stream(double &offset, const TrexStream *stream); + void add_rate_events_for_stream_cont(double &offset_usec, const TrexStream *stream); + void add_rate_events_for_stream_single_burst(double &offset_usec, const TrexStream *stream); + void add_rate_events_for_stream_multi_burst(double &offset_usec, const TrexStream *stream); + + /* for fast processing of streams */ + std::unordered_map<uint32_t, const TrexStream *> m_streams_hash; + + /* main object to hold the graph - returned to the user */ + TrexStreamsGraphObj *m_graph_obj; +}; + +#endif /* __TREX_STREAMS_COMPILER_H__ */ diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 3755b82c..22ca922d 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -18,118 +19,632 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ - #include <trex_stateless_dp_core.h> -#include <stdio.h> -#include <unistd.h> -#include <trex_stateless.h> +#include <trex_stateless_messaging.h> +#include <trex_streams_compiler.h> +#include <trex_stream_node.h> +#include <trex_stream.h> #include <bp_sim.h> -#ifndef TREX_RPC_MOCK_SERVER -// DPDK c++ issue -#define UINT8_MAX 255 -#define UINT16_MAX 0xFFFF -// DPDK c++ issue -#endif +void CDpOneStream::Delete(CFlowGenListPerThread * core){ + assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE); + core->free_node((CGenNode *)m_node); + delete m_dp_stream; + m_node=0; + m_dp_stream=0; +} -#include <rte_ethdev.h> -#include "mbuf.h" +void CDpOneStream::DeleteOnlyStream(){ + assert(m_dp_stream); + delete m_dp_stream; + m_dp_stream=0; +} -/** - * TEST - * - */ -static const uint8_t udp_pkt[]={ - 0x00,0x00,0x00,0x01,0x00,0x00, - 0x00,0x00,0x00,0x01,0x00,0x00, - 0x08,0x00, - - 0x45,0x00,0x00,0x81, - 0xaf,0x7e,0x00,0x00, - 0x12,0x11,0xd9,0x23, - 0x01,0x01,0x01,0x01, - 0x3d,0xad,0x72,0x1b, - - 0x11,0x11, - 0x11,0x11, - - 0x00,0x6d, - 0x00,0x00, - - 0x64,0x31,0x3a,0x61, - 0x64,0x32,0x3a,0x69,0x64, - 0x32,0x30,0x3a,0xd0,0x0e, - 0xa1,0x4b,0x7b,0xbd,0xbd, - 0x16,0xc6,0xdb,0xc4,0xbb,0x43, - 0xf9,0x4b,0x51,0x68,0x33,0x72, - 0x20,0x39,0x3a,0x69,0x6e,0x66,0x6f, - 0x5f,0x68,0x61,0x73,0x68,0x32,0x30,0x3a,0xee,0xc6,0xa3, - 0xd3,0x13,0xa8,0x43,0x06,0x03,0xd8,0x9e,0x3f,0x67,0x6f, - 0xe7,0x0a,0xfd,0x18,0x13,0x8d,0x65,0x31,0x3a,0x71,0x39, - 0x3a,0x67,0x65,0x74,0x5f,0x70,0x65,0x65,0x72,0x73,0x31, - 0x3a,0x74,0x38,0x3a,0x3d,0xeb,0x0c,0xbf,0x0d,0x6a,0x0d, - 0xa5,0x31,0x3a,0x79,0x31,0x3a,0x71,0x65,0x87,0xa6,0x7d, - 0xe7 -}; - -static int -test_inject_pkt(uint8_t *pkt, uint32_t pkt_size) { - - #ifndef TREX_RPC_MOCK_SERVER - rte_mempool_t * mp= CGlobalInfo::m_mem_pool[0].m_big_mbuf_pool ; - #else - rte_mempool_t * mp = NULL; - #endif - - rte_mbuf_t *m = rte_pktmbuf_alloc(mp); - if ( unlikely(m==0) ) { - printf("ERROR no packets \n"); - return (-1); +int CGenNodeStateless::get_stream_id(){ + if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) { + return (-1); // not valid } - char *p = rte_pktmbuf_append(m, pkt_size); - assert(p); - /* set pkt data */ - memcpy(p,pkt,pkt_size); + assert(m_ref_stream_info); + return ((int)m_ref_stream_info->m_stream_id); +} + + +void CGenNodeStateless::DumpHeader(FILE *fd){ + fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n"); + +} +void CGenNodeStateless::Dump(FILE *fd){ + fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu \n", + m_time, + (ulong)m_port_id, + "s-pkt", //action + get_stream_state_str(m_state ).c_str(), + get_stream_id(), //stream_id + TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype + (ulong)m_multi_bursts, + (ulong)m_single_burst + ); +} + + +void CGenNodeStateless::refresh(){ + + /* refill the stream info */ + m_single_burst = m_single_burst_refill; + m_multi_bursts = m_ref_stream_info->m_num_bursts; + m_state = CGenNodeStateless::ss_ACTIVE; +} + + +void CGenNodeCommand::free_command(){ + + assert(m_cmd); + m_cmd->on_node_remove(); + delete m_cmd; +} + + +std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){ + std::string res; + + switch (stream_state) { + case CGenNodeStateless::ss_FREE_RESUSE : + res="FREE "; + break; + case CGenNodeStateless::ss_INACTIVE : + res="INACTIVE "; + break; + case CGenNodeStateless::ss_ACTIVE : + res="ACTIVE "; + break; + default: + res="Unknow "; + }; + return(res); +} + + +void CGenNodeStateless::free_stl_node(){ + /* if we have cache mbuf free it */ + rte_mbuf_t * m=get_cache_mbuf(); + if (m) { + rte_pktmbuf_free(m); + m_cache_mbuf=0; + } +} + + +bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){ + m_active_streams-=d; /* reduce the number of streams */ + if (m_active_streams == 0) { + return (true); + } + return (false); +} + +bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){ + + /* we are working with continues streams so we must be in transmit mode */ + assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE); + + for (auto dp_stream : m_active_nodes) { + CGenNodeStateless * node =dp_stream.m_node; + assert(node->get_port_id() == port_id); + assert(node->is_pause() == true); + node->set_pause(false); + } + m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING; + return (true); +} + +bool TrexStatelessDpPerPort::update_traffic(uint8_t port_id, double factor) { + + assert( (m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING || + (m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE)) ); + + for (auto dp_stream : m_active_nodes) { + CGenNodeStateless * node = dp_stream.m_node; + assert(node->get_port_id() == port_id); - rte_mbuf_t *tx_pkts[32]; - tx_pkts[0] = m; - uint8_t nb_pkts = 1; - uint16_t ret = rte_eth_tx_burst(0, 0, tx_pkts, nb_pkts); - (void)ret; - rte_pktmbuf_free(m); + node->update_rate(factor); + } - return (0); + return (true); } -static int -test_inject_udp_pkt(){ - return (test_inject_pkt((uint8_t*)udp_pkt,sizeof(udp_pkt))); +bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){ + + /* we are working with continues streams so we must be in transmit mode */ + assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING); + + for (auto dp_stream : m_active_nodes) { + CGenNodeStateless * node =dp_stream.m_node; + assert(node->get_port_id() == port_id); + assert(node->is_pause() == false); + node->set_pause(true); + } + m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE; + return (true); } + +bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id){ + + + if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) { + assert(m_active_streams==0); + return false; + } + + /* there could be race of stop after stop */ + if ( stop_on_id ) { + if (event_id != m_event_id){ + /* we can't stop it is an old message */ + return false; + } + } + + for (auto dp_stream : m_active_nodes) { + CGenNodeStateless * node =dp_stream.m_node; + assert(node->get_port_id() == port_id); + if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) { + node->mark_for_free(); + m_active_streams--; + dp_stream.DeleteOnlyStream(); + + }else{ + dp_stream.Delete(m_core); + } + } + + /* active stream should be zero */ + assert(m_active_streams==0); + m_active_nodes.clear(); + m_state=TrexStatelessDpPerPort::ppSTATE_IDLE; + return (true); +} + + +void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){ + m_core=core; + m_state=TrexStatelessDpPerPort::ppSTATE_IDLE; + m_port_id=0; + m_active_streams=0; + m_active_nodes.clear(); +} + + + void -TrexStatelessDpCore::test_inject_dummy_pkt() { - test_inject_udp_pkt(); +TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) { + m_thread_id = thread_id; + m_core = core; + m_local_port_offset = 2*core->getDualPortId(); + + CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp(); + + m_ring_from_cp = cp_dp->getRingCpToDp(thread_id); + m_ring_to_cp = cp_dp->getRingDpToCp(thread_id); + + m_state = STATE_IDLE; + + int i; + for (i=0; i<NUM_PORTS_PER_CORE; i++) { + m_ports[i].create(core); + } +} + + +/* move to the next stream, old stream move to INACTIVE */ +bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node, + CGenNodeStateless * next_node){ + + assert(cur_node); + TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id); + bool schedule =false; + + bool to_stop_port=false; + + if (next_node == NULL) { + /* there is no next stream , reduce the number of active streams*/ + to_stop_port = lp_port->update_number_of_active_streams(1); + + }else{ + uint8_t state=next_node->get_state(); + + /* can't be FREE_RESUSE */ + assert(state != CGenNodeStateless::ss_FREE_RESUSE); + if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) { + + /* refill start info and scedule, no update in active streams */ + next_node->refresh(); + schedule = true; + + }else{ + to_stop_port = lp_port->update_number_of_active_streams(1); + } + } + + if ( to_stop_port ) { + /* call stop port explictly to move the state */ + stop_traffic(cur_node->m_port_id,false,0); + } + + return ( schedule ); } -/*************************** - * DP core + + +/** + * in idle state loop, the processor most of the time sleeps + * and periodically checks for messages * - **************************/ -TrexStatelessDpCore::TrexStatelessDpCore(uint8_t core_id) : m_core_id(core_id) { + * @author imarom (01-Nov-15) + */ +void +TrexStatelessDpCore::idle_state_loop() { + + while (m_state == STATE_IDLE) { + periodic_check_for_cp_messages(); + delay(200); + } +} + + + +void TrexStatelessDpCore::quit_main_loop(){ + m_core->set_terminate_mode(true); /* mark it as terminated */ + m_state = STATE_TERMINATE; + add_global_duration(0.0001); } + /** - * main function for DP core + * scehduler runs when traffic exists + * it will return when no more transmitting is done on this + * core * + * @author imarom (01-Nov-15) */ void -TrexStatelessDpCore::run() { - printf("\nOn DP core %d\n", m_core_id); +TrexStatelessDpCore::start_scheduler() { + /* creates a maintenace job using the scheduler */ + CGenNode * node_sync = m_core->create_node() ; + node_sync->m_type = CGenNode::FLOW_SYNC; + node_sync->m_time = m_core->m_cur_time_sec + SYNC_TIME_OUT; + m_core->m_node_gen.add_node(node_sync); + + double old_offset = 0.0; + m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset); + /* bail out in case of terminate */ + if (m_state != TrexStatelessDpCore::STATE_TERMINATE) { + m_core->m_node_gen.close_file(m_core); + m_state = STATE_IDLE; /* we exit from all ports and we have nothing to do, we move to IDLE state */ + } +} + + +void +TrexStatelessDpCore::run_once(){ + + idle_state_loop(); + + if ( m_state == STATE_TERMINATE ){ + return; + } + + start_scheduler(); +} + + + + +void +TrexStatelessDpCore::start() { + while (true) { - test_inject_dummy_pkt(); - rte_pause(); + run_once(); + + if ( m_core->is_terminated_by_master() ) { + break; + } + } +} + +/* only if both port are idle we can exit */ +void +TrexStatelessDpCore::schedule_exit(){ + + CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ; + + node->m_type = CGenNode::COMMAND; + + node->m_cmd = new TrexStatelessDpCanQuit(); + + /* make sure it will be scheduled after the current node */ + node->m_time = m_core->m_cur_time_sec ; + + m_core->m_node_gen.add_node((CGenNode *)node); +} + + +void +TrexStatelessDpCore::add_global_duration(double duration){ + if (duration > 0.0) { + CGenNode *node = m_core->create_node() ; + + node->m_type = CGenNode::EXIT_SCHED; + + /* make sure it will be scheduled after the current node */ + node->m_time = m_core->m_cur_time_sec + duration ; + + m_core->m_node_gen.add_node(node); + } +} + +/* add per port exit */ +void +TrexStatelessDpCore::add_port_duration(double duration, + uint8_t port_id, + int event_id){ + if (duration > 0.0) { + CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ; + + node->m_type = CGenNode::COMMAND; + + /* make sure it will be scheduled after the current node */ + node->m_time = m_core->m_cur_time_sec + duration ; + + TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id); + + + /* test this */ + m_core->m_non_active_nodes++; + cmd->set_core_ptr(m_core); + cmd->set_event_id(event_id); + cmd->set_wait_for_event_id(true); + + node->m_cmd = cmd; + + m_core->m_node_gen.add_node((CGenNode *)node); + } +} + + +void +TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port, + TrexStream * stream, + TrexStreamsCompiledObj *comp) { + + CGenNodeStateless *node = m_core->create_node_sl(); + + /* add periodic */ + node->m_type = CGenNode::STATELESS_PKT; + + node->m_ref_stream_info = stream->clone_as_dp(); + + node->m_next_stream=0; /* will be fixed later */ + + + if ( stream->m_self_start ){ + /* if self start it is in active mode */ + node->m_state =CGenNodeStateless::ss_ACTIVE; + lp_port->m_active_streams++; + }else{ + node->m_state =CGenNodeStateless::ss_INACTIVE; } + + node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec); + + pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id); + node->m_flags = 0; + + /* set socket id */ + node->set_socket_id(m_core->m_node_gen.m_socket_id); + + /* build a mbuf from a packet */ + + uint16_t pkt_size = stream->m_pkt.len; + const uint8_t *stream_pkt = stream->m_pkt.binary; + + node->m_pause =0; + node->m_stream_type = stream->m_type; + node->m_next_time_offset = 1.0 / stream->get_pps(); + + /* stateless specific fields */ + switch ( stream->m_type ) { + + case TrexStream::stCONTINUOUS : + node->m_single_burst=0; + node->m_single_burst_refill=0; + node->m_multi_bursts=0; + node->m_ibg_sec = 0.0; + break; + + case TrexStream::stSINGLE_BURST : + node->m_stream_type = TrexStream::stMULTI_BURST; + node->m_single_burst = stream->m_burst_total_pkts; + node->m_single_burst_refill = stream->m_burst_total_pkts; + node->m_multi_bursts = 1; /* single burst in multi burst of 1 */ + node->m_ibg_sec = 0.0; + break; + + case TrexStream::stMULTI_BURST : + node->m_single_burst = stream->m_burst_total_pkts; + node->m_single_burst_refill = stream->m_burst_total_pkts; + node->m_multi_bursts = stream->m_num_bursts; + node->m_ibg_sec = usec_to_sec( stream->m_ibg_usec ); + break; + default: + + assert(0); + }; + + node->m_port_id = stream->m_port_id; + + /* allocate const mbuf */ + rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size); + assert(m); + + char *p = rte_pktmbuf_append(m, pkt_size); + assert(p); + /* copy the packet */ + memcpy(p,stream_pkt,pkt_size); + + /* set dir 0 or 1 client or server */ + node->set_mbuf_cache_dir(dir); + + /* TBD repace the mac if req we should add flag */ + m_core->m_node_gen.m_v_if->update_mac_addr_from_global_cfg(dir, m); + + /* set the packet as a readonly */ + node->set_cache_mbuf(m); + + CDpOneStream one_stream; + + one_stream.m_dp_stream = node->m_ref_stream_info; + one_stream.m_node =node; + + lp_port->m_active_nodes.push_back(one_stream); + + /* schedule only if active */ + if (node->m_state == CGenNodeStateless::ss_ACTIVE) { + m_core->m_node_gen.add_node((CGenNode *)node); + } +} + +void +TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, + double duration, + int event_id) { + + + TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id()); + lp_port->m_active_streams = 0; + lp_port->set_event_id(event_id); + + /* no nodes in the list */ + assert(lp_port->m_active_nodes.size()==0); + + for (auto single_stream : obj->get_objects()) { + /* all commands should be for the same port */ + assert(obj->get_port_id() == single_stream.m_stream->m_port_id); + add_stream(lp_port,single_stream.m_stream,obj); + } + + uint32_t nodes = lp_port->m_active_nodes.size(); + /* find next stream */ + assert(nodes == obj->get_objects().size()); + + int cnt=0; + + /* set the next_stream pointer */ + for (auto single_stream : obj->get_objects()) { + + if (single_stream.m_stream->is_dp_next_stream() ) { + int stream_id = single_stream.m_stream->m_next_stream_id; + assert(stream_id<nodes); + /* point to the next stream , stream_id is fixed */ + lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ; + } + cnt++; + } + + lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING; + m_state = TrexStatelessDpCore::STATE_TRANSMITTING; + + + if ( duration > 0.0 ){ + add_port_duration( duration ,obj->get_port_id(),event_id ); + } + +} + + +bool TrexStatelessDpCore::are_all_ports_idle(){ + + bool res=true; + int i; + for (i=0; i<NUM_PORTS_PER_CORE; i++) { + if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){ + res=false; + } + } + return (res); +} + + +void +TrexStatelessDpCore::resume_traffic(uint8_t port_id){ + + TrexStatelessDpPerPort * lp_port = get_port_db(port_id); + + lp_port->resume_traffic(port_id); +} + + +void +TrexStatelessDpCore::pause_traffic(uint8_t port_id){ + + TrexStatelessDpPerPort * lp_port = get_port_db(port_id); + + lp_port->pause_traffic(port_id); +} + +void +TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) { + + TrexStatelessDpPerPort * lp_port = get_port_db(port_id); + + lp_port->update_traffic(port_id, factor); +} + + +void +TrexStatelessDpCore::stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id) { + /* we cannot remove nodes not from the top of the queue so + for every active node - make sure next time + the scheduler invokes it, it will be free */ + + TrexStatelessDpPerPort * lp_port = get_port_db(port_id); + + if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){ + /* nothing to do ! already stopped */ + //printf(" skip .. %f\n",m_core->m_cur_time_sec); + return; + } + +#if 0 + if ( are_all_ports_idle() ) { + /* just a place holder if we will need to do somthing in that case */ + } +#endif + + /* inform the control plane we stopped - this might be a async stop + (streams ended) + */ + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id); + TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id, + port_id, + TrexDpPortEvent::EVENT_STOP, + lp_port->get_event_id()); + ring->Enqueue((CGenNode *)event_msg); + +} + +/** + * handle a message from CP to DP + * + */ +void +TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) { + msg->handle(this); + delete msg; } diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index 4b09b752..7dc4a2b2 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -21,23 +22,262 @@ limitations under the License. #ifndef __TREX_STATELESS_DP_CORE_H__ #define __TREX_STATELESS_DP_CORE_H__ -#include <stdint.h> +#include <vector> + +#include <msg_manager.h> +#include <pal_utl.h> + +class TrexStatelessCpToDpMsgBase; +class TrexStatelessDpStart; +class CFlowGenListPerThread; +class CGenNodeStateless; +class TrexStreamsCompiledObj; +class TrexStream; + + +class CDpOneStream { +public: + void Create(){ + } + + void Delete(CFlowGenListPerThread * core); + void DeleteOnlyStream(); + + CGenNodeStateless * m_node; // schedule node + TrexStream * m_dp_stream; // stream info +}; + +class TrexStatelessDpPerPort { + +public: + /* states */ + enum state_e { + ppSTATE_IDLE, + ppSTATE_TRANSMITTING, + ppSTATE_PAUSE + + }; + +public: + TrexStatelessDpPerPort(){ + } + + void create(CFlowGenListPerThread * core); + + bool pause_traffic(uint8_t port_id); + + bool resume_traffic(uint8_t port_id); + + bool update_traffic(uint8_t port_id, double factor); + + bool stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id); + + bool update_number_of_active_streams(uint32_t d); + + state_e get_state() { + return m_state; + } + + void set_event_id(int event_id) { + m_event_id = event_id; + } + + int get_event_id() { + return m_event_id; + } + +public: + + state_e m_state; + uint8_t m_port_id; + + uint32_t m_active_streams; /* how many active streams on this port */ + + std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */ + CFlowGenListPerThread * m_core ; + int m_event_id; +}; + +/* for now */ +#define NUM_PORTS_PER_CORE 2 -/** - * stateless DP core object - * - */ class TrexStatelessDpCore { + public: - TrexStatelessDpCore(uint8_t core_id); + /* states */ + enum state_e { + STATE_IDLE, + STATE_TRANSMITTING, + STATE_TERMINATE + + }; + + TrexStatelessDpCore() { + m_thread_id = 0; + m_core = NULL; + m_duration = -1; + } + + /** + * "static constructor" + * + * @param thread_id + * @param core + */ + void create(uint8_t thread_id, CFlowGenListPerThread *core); + + /** + * launch the stateless DP core code + * + */ + void start(); + + + /* exit after batch of commands */ + void run_once(); + + /** + * dummy traffic creator + * + * @author imarom (27-Oct-15) + * + * @param pkt + * @param pkt_len + */ + void start_traffic(TrexStreamsCompiledObj *obj, + double duration, + int m_event_id); + + + /* pause the streams, work only if all are continues */ + void pause_traffic(uint8_t port_id); + + + + void resume_traffic(uint8_t port_id); + + + /** + * update current traffic rate + * + * @author imarom (25-Nov-15) + * + */ + void update_traffic(uint8_t port_id, double mul); + + /** + * + * stop all traffic for this core + * + */ + void stop_traffic(uint8_t port_id,bool stop_on_id, int event_id); + + + /* return if all ports are idel */ + bool are_all_ports_idle(); + + + /** + * check for and handle messages from CP + * + * @author imarom (27-Oct-15) + */ + void periodic_check_for_cp_messages() { + // doing this inline for performance reasons + + /* fast path */ + if ( likely ( m_ring_from_cp->isEmpty() ) ) { + return; + } + + while ( true ) { + CGenNode * node = NULL; + if (m_ring_from_cp->Dequeue(node) != 0) { + break; + } + assert(node); + + TrexStatelessCpToDpMsgBase * msg = (TrexStatelessCpToDpMsgBase *)node; + handle_cp_msg(msg); + } + + } + + /* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */ + void quit_main_loop(); + + state_e get_state() { + return m_state; + } + + bool set_stateless_next_node(CGenNodeStateless * cur_node, + CGenNodeStateless * next_node); + + + TrexStatelessDpPerPort * get_port_db(uint8_t port_id){ + assert((m_local_port_offset==port_id) ||(m_local_port_offset+1==port_id)); + uint8_t local_port_id = port_id -m_local_port_offset; + assert(local_port_id<NUM_PORTS_PER_CORE); + return (&m_ports[local_port_id]); + } + - /* starts the DP core run */ - void run(); private: - void test_inject_dummy_pkt(); - uint8_t m_core_id; + + void schedule_exit(); + + + /** + * in idle state loop, the processor most of the time sleeps + * and periodically checks for messages + * + */ + void idle_state_loop(); + + /** + * real job is done when scheduler is launched + * + */ + void start_scheduler(); + + /** + * handles a CP to DP message + * + * @author imarom (27-Oct-15) + * + * @param msg + */ + void handle_cp_msg(TrexStatelessCpToDpMsgBase *msg); + + + void add_port_duration(double duration, + uint8_t port_id, + int event_id); + + void add_global_duration(double duration); + + void add_stream(TrexStatelessDpPerPort * lp_port, + TrexStream * stream, + TrexStreamsCompiledObj *comp); + + uint8_t m_thread_id; + uint8_t m_local_port_offset; + + state_e m_state; /* state of all ports */ + CNodeRing *m_ring_from_cp; + CNodeRing *m_ring_to_cp; + + TrexStatelessDpPerPort m_ports[NUM_PORTS_PER_CORE]; + + /* pointer to the main object */ + CFlowGenListPerThread * m_core; + + double m_duration; }; #endif /* __TREX_STATELESS_DP_CORE_H__ */ + diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h new file mode 100644 index 00000000..111af845 --- /dev/null +++ b/src/stateless/dp/trex_stream_node.h @@ -0,0 +1,284 @@ +/* + Itay Marom + Hanoh Haim + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STREAM_NODE_H__ +#define __TREX_STREAM_NODE_H__ + +#include <bp_sim.h> +#include <stdio.h> + +class TrexStatelessDpCore; +#include <trex_stream.h> + +class TrexStatelessCpToDpMsgBase; +class CFlowGenListPerThread; + +struct CGenNodeCommand : public CGenNodeBase { + +friend class TrexStatelessDpCore; + +public: + TrexStatelessCpToDpMsgBase * m_cmd; + + uint8_t m_pad_end[104]; + +public: + void free_command(); + +} __rte_cache_aligned;; + + +static_assert(sizeof(CGenNodeCommand) == sizeof(CGenNode), "sizeof(CGenNodeCommand) != sizeof(CGenNode)" ); + + +/* this is a event for stateless */ +struct CGenNodeStateless : public CGenNodeBase { +friend class TrexStatelessDpCore; + +public: + enum { + ss_FREE_RESUSE =1, /* should be free by scheduler */ + ss_INACTIVE =2, /* will be active by other stream or stopped */ + ss_ACTIVE =3 /* the stream is active */ + }; + typedef uint8_t stream_state_t ; + + static std::string get_stream_state_str(stream_state_t stream_state); + +private: + /* cache line 0 */ + /* important stuff here */ + void * m_cache_mbuf; + + double m_next_time_offset; /* in sec */ + double m_ibg_sec; /* inter burst time in sec */ + + + stream_state_t m_state; + uint8_t m_port_id; + uint8_t m_stream_type; /* see TrexStream::STREAM_TYPE ,stream_type_t */ + uint8_t m_pause; + + uint32_t m_single_burst; /* the number of bursts in case of burst */ + uint32_t m_single_burst_refill; + + uint32_t m_multi_bursts; /* in case of multi_burst how many bursts */ + + /* cache line 1 */ + TrexStream * m_ref_stream_info; /* the stream info */ + CGenNodeStateless * m_next_stream; + + /* pad to match the size of CGenNode */ + uint8_t m_pad_end[56]; + + + + +public: + + uint8_t get_port_id(){ + return (m_port_id); + } + + + /** + * calculate the time offset based + * on the PPS and multiplier + * + */ + void update_rate(double factor) { + /* update the inter packet gap */ + m_next_time_offset = m_next_time_offset / factor; + } + + /* we restart the stream, schedule it using stream isg */ + inline void update_refresh_time(double cur_time){ + m_time = cur_time + usec_to_sec(m_ref_stream_info->m_isg_usec); + } + + inline bool is_mask_for_free(){ + return (get_state() == CGenNodeStateless::ss_FREE_RESUSE ?true:false); + + } + inline void mark_for_free(){ + set_state(CGenNodeStateless::ss_FREE_RESUSE); + /* only to be safe */ + m_ref_stream_info= NULL; + m_next_stream= NULL; + } + + bool is_pause(){ + return (m_pause==1?true:false); + } + + void set_pause(bool enable){ + if ( enable ){ + m_pause=1; + }else{ + m_pause=0; + } + } + + inline uint8_t get_stream_type(){ + return (m_stream_type); + } + + inline uint32_t get_single_burst_cnt(){ + return (m_single_burst); + } + + inline double get_multi_ibg_sec(){ + return (m_ibg_sec); + } + + inline uint32_t get_multi_burst_cnt(){ + return (m_multi_bursts); + } + + inline void set_state(stream_state_t new_state){ + m_state=new_state; + } + + + inline stream_state_t get_state() { + return m_state; + } + + void refresh(); + + inline void handle_continues(CFlowGenListPerThread *thread) { + + if (unlikely (is_pause()==false)) { + thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); + } + + /* in case of continues */ + m_time += m_next_time_offset; + + /* insert a new event */ + thread->m_node_gen.m_p_queue.push( (CGenNode *)this); + } + + inline void handle_multi_burst(CFlowGenListPerThread *thread) { + thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); + + m_single_burst--; + if (m_single_burst > 0 ) { + /* in case of continues */ + m_time += m_next_time_offset; + + thread->m_node_gen.m_p_queue.push( (CGenNode *)this); + }else{ + m_multi_bursts--; + if ( m_multi_bursts == 0 ) { + set_state(CGenNodeStateless::ss_INACTIVE); + if ( thread->set_stateless_next_node(this,m_next_stream) ){ + /* update the next stream time using isg */ + m_next_stream->update_refresh_time(m_time); + + thread->m_node_gen.m_p_queue.push( (CGenNode *)m_next_stream); + }else{ + // in case of zero we will schedule a command to stop + // will be called from set_stateless_next_node + } + + }else{ + m_time += m_ibg_sec; + m_single_burst = m_single_burst_refill; + thread->m_node_gen.m_p_queue.push( (CGenNode *)this); + } + } + } + + + /** + * main function to handle an event of a packet tx + * + * + * + */ + + inline void handle(CFlowGenListPerThread *thread) { + + if (m_stream_type == TrexStream::stCONTINUOUS ) { + handle_continues(thread) ; + }else{ + if (m_stream_type == TrexStream::stMULTI_BURST) { + handle_multi_burst(thread); + }else{ + assert(0); + } + } + + } + + void set_socket_id(socket_id_t socket){ + m_socket_id=socket; + } + + socket_id_t get_socket_id(){ + return ( m_socket_id ); + } + + inline void set_mbuf_cache_dir(pkt_dir_t dir){ + if (dir) { + m_flags |=NODE_FLAGS_DIR; + }else{ + m_flags &=~NODE_FLAGS_DIR; + } + } + + inline pkt_dir_t get_mbuf_cache_dir(){ + return ((pkt_dir_t)( m_flags &1)); + } + + inline void set_cache_mbuf(rte_mbuf_t * m){ + m_cache_mbuf=(void *)m; + m_flags |= NODE_FLAGS_MBUF_CACHE; + } + + inline rte_mbuf_t * get_cache_mbuf(){ + if ( m_flags &NODE_FLAGS_MBUF_CACHE ) { + return ((rte_mbuf_t *)m_cache_mbuf); + }else{ + return ((rte_mbuf_t *)0); + } + } + + void free_stl_node(); + +public: + /* debug functions */ + + int get_stream_id(); + + static void DumpHeader(FILE *fd); + + void Dump(FILE *fd); + +} __rte_cache_aligned; + +static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)" ); + + + + +#endif /* __TREX_STREAM_NODE_H__ */ diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp new file mode 100644 index 00000000..257de168 --- /dev/null +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -0,0 +1,191 @@ +/* + Itay Marom + Hanoch Haim + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include <trex_stateless_messaging.h> +#include <trex_stateless_dp_core.h> +#include <trex_streams_compiler.h> +#include <trex_stateless.h> +#include <bp_sim.h> + +#include <string.h> + +/************************* + start traffic message + ************************/ +TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexStreamsCompiledObj *obj, double duration) { + m_port_id = port_id; + m_event_id = event_id; + m_obj = obj; + m_duration = duration; +} + + +/** + * clone for DP start message + * + */ +TrexStatelessCpToDpMsgBase * +TrexStatelessDpStart::clone() { + + TrexStreamsCompiledObj *new_obj = m_obj->clone(); + + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStart(m_port_id, m_event_id, new_obj, m_duration); + + return new_msg; +} + +TrexStatelessDpStart::~TrexStatelessDpStart() { + if (m_obj) { + delete m_obj; + } +} + +bool +TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { + + /* staet traffic */ + dp_core->start_traffic(m_obj, m_duration,m_event_id); + + return true; +} + +/************************* + stop traffic message + ************************/ +bool +TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) { + + + dp_core->stop_traffic(m_port_id,m_stop_only_for_event_id,m_event_id); + return true; +} + + +void TrexStatelessDpStop::on_node_remove(){ + if ( m_core ) { + assert(m_core->m_non_active_nodes>0); + m_core->m_non_active_nodes--; + } +} + + +TrexStatelessCpToDpMsgBase * TrexStatelessDpPause::clone(){ + + TrexStatelessDpPause *new_msg = new TrexStatelessDpPause(m_port_id); + return new_msg; +} + + +bool TrexStatelessDpPause::handle(TrexStatelessDpCore *dp_core){ + dp_core->pause_traffic(m_port_id); + return (true); +} + + + +TrexStatelessCpToDpMsgBase * TrexStatelessDpResume::clone(){ + TrexStatelessDpResume *new_msg = new TrexStatelessDpResume(m_port_id); + return new_msg; +} + +bool TrexStatelessDpResume::handle(TrexStatelessDpCore *dp_core){ + dp_core->resume_traffic(m_port_id); + return (true); +} + + +/** + * clone for DP stop message + * + */ +TrexStatelessCpToDpMsgBase * +TrexStatelessDpStop::clone() { + TrexStatelessDpStop *new_msg = new TrexStatelessDpStop(m_port_id); + + new_msg->set_event_id(m_event_id); + new_msg->set_wait_for_event_id(m_stop_only_for_event_id); + /* set back pointer to master */ + new_msg->set_core_ptr(m_core); + + return new_msg; +} + + + +TrexStatelessCpToDpMsgBase * +TrexStatelessDpQuit::clone(){ + + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpQuit(); + + return new_msg; +} + + +bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){ + + /* quit */ + dp_core->quit_main_loop(); + return (true); +} + +bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){ + + if ( dp_core->are_all_ports_idle() ){ + /* if all ports are idle quit now */ + set_quit(true); + } + return (true); +} + +TrexStatelessCpToDpMsgBase * +TrexStatelessDpCanQuit::clone(){ + + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpCanQuit(); + + return new_msg; +} + +/************************* + update traffic message + ************************/ +bool +TrexStatelessDpUpdate::handle(TrexStatelessDpCore *dp_core) { + dp_core->update_traffic(m_port_id, m_factor); + + return true; +} + +TrexStatelessCpToDpMsgBase * +TrexStatelessDpUpdate::clone() { + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpUpdate(m_port_id, m_factor); + + return new_msg; +} + +/************************* messages from DP to CP **********************/ +bool +TrexDpPortEventMsg::handle() { + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id); + port->get_dp_events().handle_event(m_event_type, m_thread_id, m_event_id); + + return (true); +} + diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h new file mode 100644 index 00000000..d56596bf --- /dev/null +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -0,0 +1,319 @@ +/* + Itay Marom + Hanoch Haim + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STATELESS_MESSAGING_H__ +#define __TREX_STATELESS_MESSAGING_H__ + +#include <msg_manager.h> +#include <trex_dp_port_events.h> + +class TrexStatelessDpCore; +class TrexStreamsCompiledObj; +class CFlowGenListPerThread; + +/** + * defines the base class for CP to DP messages + * + * @author imarom (27-Oct-15) + */ +class TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessCpToDpMsgBase() { + m_quit_scheduler=false; + } + + virtual ~TrexStatelessCpToDpMsgBase() { + } + + + virtual bool handle(TrexStatelessDpCore *dp_core) = 0; + + /** + * clone the current message + * + */ + virtual TrexStatelessCpToDpMsgBase * clone() = 0; + + /* do we want to quit scheduler, can be set by handle function */ + void set_quit(bool enable){ + m_quit_scheduler=enable; + } + + bool is_quit(){ + return ( m_quit_scheduler); + } + + /* this node is called from scheduler in case the node is free */ + virtual void on_node_remove(){ + } + + /* no copy constructor */ + TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete; + +protected: + int m_event_id; + bool m_quit_scheduler; +}; + +/** + * a message to start traffic + * + * @author imarom (27-Oct-15) + */ +class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpStart(uint8_t m_port_id, int m_event_id, TrexStreamsCompiledObj *obj, double duration); + + ~TrexStatelessDpStart(); + + virtual TrexStatelessCpToDpMsgBase * clone(); + + virtual bool handle(TrexStatelessDpCore *dp_core); + +private: + + uint8_t m_port_id; + int m_event_id; + TrexStreamsCompiledObj *m_obj; + double m_duration; + +}; + +class TrexStatelessDpPause : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpPause(uint8_t port_id) : m_port_id(port_id) { + } + + + virtual TrexStatelessCpToDpMsgBase * clone(); + + + virtual bool handle(TrexStatelessDpCore *dp_core); + + +private: + uint8_t m_port_id; +}; + + +class TrexStatelessDpResume : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpResume(uint8_t port_id) : m_port_id(port_id) { + } + + + virtual TrexStatelessCpToDpMsgBase * clone(); + + + virtual bool handle(TrexStatelessDpCore *dp_core); + + +private: + uint8_t m_port_id; +}; + + +/** + * a message to stop traffic + * + * @author imarom (27-Oct-15) + */ +class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) { + m_stop_only_for_event_id=false; + m_event_id=0; + m_core = NULL; + } + + virtual TrexStatelessCpToDpMsgBase * clone(); + + + virtual bool handle(TrexStatelessDpCore *dp_core); + + void set_core_ptr(CFlowGenListPerThread * core){ + m_core = core; + } + + CFlowGenListPerThread * get_core_ptr(){ + return ( m_core); + } + + + void set_event_id(int event_id){ + m_event_id = event_id; + } + + void set_wait_for_event_id(bool wait){ + m_stop_only_for_event_id = wait; + } + + virtual void on_node_remove(); + + + bool get_is_stop_by_event_id(){ + return (m_stop_only_for_event_id); + } + + int get_event_id(){ + return (m_event_id); + } + +private: + uint8_t m_port_id; + bool m_stop_only_for_event_id; + int m_event_id; + CFlowGenListPerThread * m_core ; + +}; + +/** + * a message to Quit the datapath traffic. support only stateless for now + * + * @author hhaim + */ +class TrexStatelessDpQuit : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpQuit() { + } + + + virtual TrexStatelessCpToDpMsgBase * clone(); + + virtual bool handle(TrexStatelessDpCore *dp_core); + +}; + +/** + * a message to check if both port are idel and exit + * + * @author hhaim + */ +class TrexStatelessDpCanQuit : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpCanQuit() { + } + + virtual bool handle(TrexStatelessDpCore *dp_core); + + virtual TrexStatelessCpToDpMsgBase * clone(); +}; + + +/** + * update message + */ +class TrexStatelessDpUpdate : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpUpdate(uint8_t port_id, double factor) { + m_port_id = port_id; + m_factor = factor; + } + + virtual bool handle(TrexStatelessDpCore *dp_core); + + virtual TrexStatelessCpToDpMsgBase * clone(); + +private: + uint8_t m_port_id; + double m_factor; +}; + + +/************************* messages from DP to CP **********************/ + +/** + * defines the base class for CP to DP messages + * + * @author imarom (27-Oct-15) + */ +class TrexStatelessDpToCpMsgBase { +public: + + TrexStatelessDpToCpMsgBase() { + } + + virtual ~TrexStatelessDpToCpMsgBase() { + } + + /** + * virtual function to handle a message + * + */ + virtual bool handle() = 0; + + /* no copy constructor */ + TrexStatelessDpToCpMsgBase(TrexStatelessDpToCpMsgBase &) = delete; + +}; + + +/** + * a message indicating an event has happened on a port at the + * DP + * + */ +class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase { +public: + + TrexDpPortEventMsg(int thread_id, uint8_t port_id, TrexDpPortEvent::event_e type, int event_id) { + m_thread_id = thread_id; + m_port_id = port_id; + m_event_type = type; + m_event_id = event_id; + } + + virtual bool handle(); + + int get_thread_id() { + return m_thread_id; + } + + uint8_t get_port_id() { + return m_port_id; + } + + TrexDpPortEvent::event_e get_event_type() { + return m_event_type; + } + + int get_event_id() { + return m_event_id; + } + +private: + int m_thread_id; + uint8_t m_port_id; + TrexDpPortEvent::event_e m_event_type; + int m_event_id; + +}; + +#endif /* __TREX_STATELESS_MESSAGING_H__ */ + |