From 59a3b58d240661a2bc06c6ede473d2eda4eb5e55 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 2 Mar 2016 11:05:51 +0200 Subject: TX barrier --- src/gtest/trex_stateless_gtest.cpp | 1 - src/internal_api/trex_platform_api.h | 5 + src/main_dpdk.cpp | 37 +++- src/rpc-server/trex_rpc_req_resp_server.cpp | 35 +--- src/rpc-server/trex_rpc_req_resp_server.h | 2 +- src/rpc-server/trex_rpc_server.cpp | 32 ++- src/rpc-server/trex_rpc_server_api.h | 2 +- src/sim/trex_sim.h | 1 - src/sim/trex_sim_stateless.cpp | 1 - src/stateless/cp/trex_dp_port_events.cpp | 217 ++++++++------------- src/stateless/cp/trex_dp_port_events.h | 121 +++--------- src/stateless/cp/trex_stateless.cpp | 2 +- src/stateless/cp/trex_stateless.h | 6 +- src/stateless/cp/trex_stateless_port.cpp | 108 +++++----- src/stateless/cp/trex_stateless_port.h | 16 +- src/stateless/dp/trex_stateless_dp_core.cpp | 36 ++-- src/stateless/dp/trex_stateless_dp_core.h | 3 +- .../messaging/trex_stateless_messaging.cpp | 20 +- src/stateless/messaging/trex_stateless_messaging.h | 30 ++- 19 files changed, 319 insertions(+), 356 deletions(-) (limited to 'src') diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp index 4cc40cdb..c3dfcb95 100644 --- a/src/gtest/trex_stateless_gtest.cpp +++ b/src/gtest/trex_stateless_gtest.cpp @@ -3206,7 +3206,6 @@ public: /* first the message must be an event */ TrexDpPortEventMsg *event = dynamic_cast(msg); EXPECT_TRUE(event != NULL); - EXPECT_TRUE(event->get_event_type() == TrexDpPortEvent::EVENT_STOP); EXPECT_TRUE(event->get_event_id() == m_event_id); EXPECT_TRUE(event->get_port_id() == 0); diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h index 7f7ca218..f6d7278e 100644 --- a/src/internal_api/trex_platform_api.h +++ b/src/internal_api/trex_platform_api.h @@ -148,6 +148,7 @@ public: virtual int del_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const = 0; virtual void set_promiscuous(uint8_t port_id, bool enabled) const = 0; virtual bool get_promiscuous(uint8_t port_id) const = 0; + virtual void flush_dp_messages() const = 0; virtual ~TrexPlatformApi() {} }; @@ -176,6 +177,7 @@ public: int del_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const; void set_promiscuous(uint8_t port_id, bool enabled) const; bool get_promiscuous(uint8_t port_id) const; + void flush_dp_messages() const; }; @@ -234,6 +236,9 @@ public: return false; } + void flush_dp_messages() const { + } + private: int m_dp_core_count; }; diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index af6efe1d..3404d6be 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -2599,9 +2599,9 @@ private: /* send message to all dp cores */ int send_message_all_dp(TrexStatelessCpToDpMsgBase *msg); void check_for_dp_message_from_core(int thread_id); - void check_for_dp_messages(); - + public: + void check_for_dp_messages(); int start_master_statefull(); int start_master_stateless(); int run_in_core(virtual_thread_id_t virt_core_id); @@ -2640,7 +2640,7 @@ private: public: - void publish_async_data(); + void publish_async_data(bool sync_now); void publish_async_barrier(uint32_t key); void dump_stats(FILE *fd, @@ -2686,9 +2686,11 @@ private: CLatencyPktInfo m_latency_pkt; TrexPublisher m_zmq_publisher; CGlobalStats m_stats; + std::mutex m_cp_lock; public: TrexStateless *m_trex_stateless; + }; int CGlobalTRex::reset_counters(){ @@ -2738,6 +2740,7 @@ CGlobalTRex::check_for_dp_message_from_core(int thread_id) { */ void CGlobalTRex::check_for_dp_messages() { + /* for all the cores - check for a new message */ for (int i = 0; i < get_cores_tx(); i++) { check_for_dp_message_from_core(i); @@ -3070,10 +3073,10 @@ bool CGlobalTRex::Create(){ cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd; cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; - cfg.m_rpc_async_cfg = NULL; cfg.m_rpc_server_verbose = false; cfg.m_platform_api = new TrexDpdkPlatformApi(); cfg.m_publisher = &m_zmq_publisher; + cfg.m_global_lock = &m_cp_lock; m_trex_stateless = new TrexStateless(cfg); } @@ -3531,9 +3534,15 @@ void CGlobalTRex::dump_stats(FILE *fd, CGlobalStats::DumpFormat format){ void -CGlobalTRex::publish_async_data() { +CGlobalTRex::publish_async_data(bool sync_now) { std::string json; + /* refactor to update, dump, and etc. */ + if (sync_now) { + update_stats(); + get_stats(m_stats); + } + m_stats.dump_json(json); m_zmq_publisher.publish_json(json); @@ -3572,7 +3581,7 @@ CGlobalTRex::publish_async_barrier(uint32_t key) { } int CGlobalTRex::run_in_master() { - + bool was_stopped=false; @@ -3580,6 +3589,9 @@ int CGlobalTRex::run_in_master() { m_trex_stateless->launch_control_plane(); } + /* exception and scope safe */ + std::unique_lock cp_lock(m_cp_lock); + while ( true ) { if ( CGlobalInfo::m_options.preview.get_no_keyboard() ==false ){ @@ -3669,18 +3681,23 @@ int CGlobalTRex::run_in_master() { } /* publish data */ - publish_async_data(); + publish_async_data(false); /* check from messages from DP */ check_for_dp_messages(); + cp_lock.unlock(); delay(500); + cp_lock.lock(); if ( is_all_cores_finished() ) { break; } } + /* on exit release the lock */ + cp_lock.unlock(); + if (!is_all_cores_finished()) { /* probably CLTR-C */ try_stop_all_dp(); @@ -5177,7 +5194,7 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id, intf_info_st &info void TrexDpdkPlatformApi::publish_async_data_now(uint32_t key) const { - g_trex.publish_async_data(); + g_trex.publish_async_data(true); g_trex.publish_async_barrier(key); } @@ -5213,4 +5230,6 @@ bool TrexDpdkPlatformApi::get_promiscuous(uint8_t port_id) const { return g_trex.m_ports[port_id].get_promiscuous(); } - +void TrexDpdkPlatformApi::flush_dp_messages() const { + g_trex.check_for_dp_messages(); +} diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp index da7e8c55..5c587e0f 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.cpp +++ b/src/rpc-server/trex_rpc_req_resp_server.cpp @@ -173,10 +173,8 @@ void TrexRpcServerReqRes::process_request_raw(const std::string &request, std::s int index = 0; - /* if lock was provided, take it */ - if (m_lock) { - m_lock->lock(); - } + /* expcetion safe */ + std::unique_lock lock(*m_lock); /* for every command parsed - launch it */ for (auto command : commands) { @@ -190,9 +188,7 @@ void TrexRpcServerReqRes::process_request_raw(const std::string &request, std::s } /* done with the lock */ - if (m_lock) { - m_lock->unlock(); - } + lock.unlock(); /* write the JSON to string and sever on ZMQ */ @@ -254,28 +250,3 @@ TrexRpcServerReqRes::test_inject_request(const std::string &req) { return response; } - -/** - * MOCK req resp server - */ -TrexRpcServerReqResMock::TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg) : TrexRpcServerReqRes(cfg) { -} - -/** - * override start - * - */ -void -TrexRpcServerReqResMock::start() { - -} - - -/** - * override stop - */ -void -TrexRpcServerReqResMock::stop() { - -} - diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h index 979bf9af..26b3248f 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.h +++ b/src/rpc-server/trex_rpc_req_resp_server.h @@ -55,7 +55,6 @@ protected: void *m_socket; }; - /** * a mock req resp server (for tests) * @@ -73,5 +72,6 @@ public: }; + #endif /* __TREX_RPC_REQ_RESP_API_H__ */ diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp index 1dfc4494..7d2e31a5 100644 --- a/src/rpc-server/trex_rpc_server.cpp +++ b/src/rpc-server/trex_rpc_server.cpp @@ -33,6 +33,9 @@ limitations under the License. TrexRpcServerInterface::TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name, std::mutex *lock) : m_cfg(cfg), m_name(name), m_lock(lock) { m_is_running = false; m_is_verbose = false; + if (m_lock == NULL) { + m_lock = &m_dummy_lock; + } } TrexRpcServerInterface::~TrexRpcServerInterface() { @@ -117,7 +120,6 @@ get_current_date_time() { const std::string TrexRpcServer::s_server_uptime = get_current_date_time(); TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, - const TrexRpcServerConfig *async_cfg, std::mutex *lock) { m_req_resp = NULL; @@ -134,10 +136,6 @@ TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, m_servers.push_back(m_req_resp); } - /* add async publisher */ - if (async_cfg) { - m_servers.push_back(new TrexRpcServerAsync(*async_cfg, lock)); - } } TrexRpcServer::~TrexRpcServer() { @@ -187,3 +185,27 @@ TrexRpcServer::test_inject_request(const std::string &req_str) { return ""; } } + +/** + * MOCK req resp server + */ +TrexRpcServerReqResMock::TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg) : TrexRpcServerReqRes(cfg) { +} + +/** + * override start + * + */ +void +TrexRpcServerReqResMock::start() { + +} + + +/** + * override stop + */ +void +TrexRpcServerReqResMock::stop() { + +} diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h index 1ab5dce9..a02b2cc0 100644 --- a/src/rpc-server/trex_rpc_server_api.h +++ b/src/rpc-server/trex_rpc_server_api.h @@ -133,6 +133,7 @@ protected: std::thread *m_thread; std::string m_name; std::mutex *m_lock; + std::mutex m_dummy_lock; }; /** @@ -147,7 +148,6 @@ public: /* creates the collection of servers using configurations */ TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, - const TrexRpcServerConfig *async_cfg, std::mutex *m_lock = NULL); ~TrexRpcServer(); diff --git a/src/sim/trex_sim.h b/src/sim/trex_sim.h index 3a3a62ea..59184b75 100644 --- a/src/sim/trex_sim.h +++ b/src/sim/trex_sim.h @@ -77,7 +77,6 @@ public: cfg.m_port_count = 2; cfg.m_rpc_req_resp_cfg = NULL; - cfg.m_rpc_async_cfg = NULL; cfg.m_rpc_server_verbose = false; cfg.m_platform_api = new SimPlatformApi(1); cfg.m_publisher = NULL; diff --git a/src/sim/trex_sim_stateless.cpp b/src/sim/trex_sim_stateless.cpp index 30d60b15..87c61ae2 100644 --- a/src/sim/trex_sim_stateless.cpp +++ b/src/sim/trex_sim_stateless.cpp @@ -186,7 +186,6 @@ SimStateless::prepare_control_plane() { cfg.m_port_count = m_port_count; cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; - cfg.m_rpc_async_cfg = NULL; cfg.m_rpc_server_verbose = false; cfg.m_platform_api = new SimPlatformApi(m_dp_core_count); cfg.m_publisher = m_publisher; diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp index ba327e59..8e098adf 100644 --- a/src/stateless/cp/trex_dp_port_events.cpp +++ b/src/stateless/cp/trex_dp_port_events.cpp @@ -20,6 +20,7 @@ limitations under the License. */ #include +#include #include #include #include @@ -27,24 +28,20 @@ limitations under the License. /** * port events */ -void -TrexDpPortEvents::create(TrexStatelessPort *port) { +TrexDpPortEvents::TrexDpPortEvents(TrexStatelessPort *port) { m_port = port; - - for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) { - m_events[i].create((TrexDpPortEvent::event_e) i, port); - } - m_event_id_counter = EVENT_ID_INVALID; } -/** - * generate a new event ID - * - */ -int -TrexDpPortEvents::generate_event_id() { - return (++m_event_id_counter); +TrexDpPortEvent * +TrexDpPortEvents::lookup(int event_id) { + auto search = m_events.find(event_id); + + if (search != m_events.end()) { + return search->second; + } else { + return NULL; + } } /** @@ -52,21 +49,49 @@ TrexDpPortEvents::generate_event_id() { * all other events will be disabled * */ -void -TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) { +int +TrexDpPortEvents::create_event(TrexDpPortEvent *event, int timeout_ms) { + /* allocate ID for event */ + int event_id = ++m_event_id_counter; - /* first disable all events */ - for (TrexDpPortEvent & e : m_events) { - e.disable(); - } + /* init and add */ + event->init(m_port, event_id, timeout_ms); + m_events[event_id] = event; - /* mark this event as allowed */ - m_events[ev].wait_for_event(event_id, timeout_ms); + return event_id; } void -TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) { - m_events[ev].disable(); +TrexDpPortEvents::destroy_event(int event_id) { + TrexDpPortEvent *event = lookup(event_id); + if (!event) { + /* cannot find event */ + throw TrexException("internal error - cannot find event"); + } + + m_events.erase(event_id); + delete event; +} + +class DPBarrier : public TrexDpPortEvent { +protected: + virtual void on_event() { + /* do nothing */ + } +}; + +void +TrexDpPortEvents::barrier() { + int barrier_id = create_event(new DPBarrier()); + + TrexStatelessCpToDpMsgBase *barrier_msg = new TrexStatelessDpBarrier(m_port->m_port_id, barrier_id); + m_port->send_message_to_all_dp(barrier_msg); + + get_stateless_obj()->get_platform_api()->flush_dp_messages(); + while (lookup(barrier_id) != NULL) { + delay(1); + get_stateless_obj()->get_platform_api()->flush_dp_messages(); + } } /** @@ -74,39 +99,33 @@ TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) { * */ void -TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) { - m_events[ev].handle_event(thread_id, event_id); -} - -/*********** - * single event object - * - */ +TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id) { + TrexDpPortEvent *event = lookup(event_id); + /* event might have been deleted */ + if (!event) { + return; + } -void -TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) { - m_event_type = type; - m_port = port; + bool done = event->on_core_reporting_in(thread_id); - /* add the core ids to the hash */ - m_signal.clear(); - for (int core_id : m_port->get_core_id_list()) { - m_signal[core_id] = false; + if (done) { + destroy_event(event_id); } - - /* event is disabled */ - disable(); } -/** - * wait the event using event id and timeout +/*************************** + * event * - */ -void -TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) { + **************************/ +TrexDpPortEvent::TrexDpPortEvent() { + m_port = NULL; + m_event_id = -1; +} - /* set a new event id */ +void +TrexDpPortEvent::init(TrexStatelessPort *port, int event_id, int timeout_ms) { + m_port = port; m_event_id = event_id; /* do we have a timeout ? */ @@ -118,103 +137,33 @@ TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) { /* prepare the signal array */ m_pending_cnt = 0; - for (auto & core_pair : m_signal) { - core_pair.second = false; + for (int core_id : m_port->get_core_id_list()) { + m_signal[core_id] = false; m_pending_cnt++; } } -void -TrexDpPortEvent::disable() { - m_event_id = TrexDpPortEvents::EVENT_ID_INVALID; -} - -/** - * get the event status - * - */ - -TrexDpPortEvent::event_status_e -TrexDpPortEvent::status() { - - /* is it even active ? */ - if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) { - return (EVENT_DISABLE); - } - - /* did it occured ? */ - if (m_pending_cnt == 0) { - return (EVENT_OCCURED); - } - - /* so we are enabled and the event did not occur - maybe we timed out ? */ - if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) { - return (EVENT_TIMED_OUT); - } - - /* so we are still waiting... */ - return (EVENT_PENDING); - -} - -void -TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg) { - std::stringstream err; - err << "DP event '" << event_name(m_event_type) << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - "; -} - -/** - * event occured - * - */ -void -TrexDpPortEvent::handle_event(int thread_id, int event_id) { - - /* if the event is disabled - we don't care */ - if (!is_active()) { - return; - } - - /* check the event id is matching the required event - if not maybe its an old signal */ - if (event_id != m_event_id) { - return; - } - +bool +TrexDpPortEvent::on_core_reporting_in(int thread_id) { /* mark sure no double signal */ if (m_signal.at(thread_id)) { - err(thread_id, event_id, "double signal"); + std::stringstream err; + err << "double signal detected on event id: " << m_event_id; + throw TrexException(err.str()); - } else { - /* mark */ - m_signal.at(thread_id) = true; - m_pending_cnt--; } + /* mark */ + m_signal.at(thread_id) = true; + m_pending_cnt--; + /* event occured */ if (m_pending_cnt == 0) { - m_port->on_dp_event_occured(m_event_type); - m_event_id = TrexDpPortEvents::EVENT_ID_INVALID; + on_event(); + return true; + } else { + return false; } } -bool -TrexDpPortEvent::is_active() { - return (status() != EVENT_DISABLE); -} - -bool -TrexDpPortEvent::has_timeout_expired() { - return (status() == EVENT_TIMED_OUT); -} - -const char * -TrexDpPortEvent::event_name(event_e type) { - switch (type) { - case EVENT_STOP: - return "DP STOP"; - default: - throw TrexException("unknown event type"); - } - -} diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h index 557e590b..3b8c8633 100644 --- a/src/stateless/cp/trex_dp_port_events.h +++ b/src/stateless/cp/trex_dp_port_events.h @@ -25,95 +25,43 @@ limitations under the License. #include class TrexStatelessPort; +class TrexDpPortEvents; /** - * describes a single DP event related to port + * interface class for DP events * - * @author imarom (18-Nov-15) + * @author imarom (29-Feb-16) */ class TrexDpPortEvent { -public: - - enum event_e { - EVENT_STOP = 1, - EVENT_MAX - }; - - /** - * status of the event for the port - */ - enum event_status_e { - EVENT_DISABLE, - EVENT_PENDING, - EVENT_TIMED_OUT, - EVENT_OCCURED - }; - - /** - * init for the event - * - */ - void create(event_e type, TrexStatelessPort *port); - - /** - * create a new pending event - * - */ - void wait_for_event(int event_id, int timeout_ms = -1); - - /** - * mark event as not allowed to happen - * - */ - void disable(); - - /** - * get the event status - * - */ - event_status_e status(); - - /** - * event occured - * - */ - void handle_event(int thread_id, int event_id); + friend TrexDpPortEvents; - /** - * returns true if event is active - * - */ - bool is_active(); - - /** - * has timeout already expired ? - * - */ - bool has_timeout_expired(); - - /** - * generate error - * - */ - void err(int thread_id, int event_id, const std::string &err_msg); +public: + TrexDpPortEvent(); + virtual ~TrexDpPortEvent() {} +protected: /** - * event to name + * what to do when an event has been completed (all cores + * reported in * + * @author imarom (29-Feb-16) */ - static const char * event_name(event_e type); + virtual void on_event() = 0; + TrexStatelessPort *get_port() { + return m_port; + } private: + void init(TrexStatelessPort *port, int event_id, int timeout_ms); + bool on_core_reporting_in(int thread_id); - event_e m_event_type; std::unordered_map m_signal; int m_pending_cnt; TrexStatelessPort *m_port; int m_event_id; int m_expire_limit_ms; - }; /** @@ -124,44 +72,39 @@ class TrexDpPortEvents { public: friend class TrexDpPortEvent; - void create(TrexStatelessPort *port); + static const int INVALID_ID = -1; - /** - * generate a new event ID to be used with wait_for_event - * - */ - int generate_event_id(); + TrexDpPortEvents(TrexStatelessPort *port); /** * wait a new DP event on the port * returns a key which will be used to identify * the event happened * - * @author imarom (18-Nov-15) - * - * @param ev - type of event - * @param event_id - a unique identifier for the event - * @param timeout_ms - does it has a timeout ? - * */ - void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1); + int create_event(TrexDpPortEvent *event, int timeout_ms = -1); /** - * disable an event (don't care) + * destroy an event * */ - void disable(TrexDpPortEvent::event_e ev); + void destroy_event(int event_id); /** - * event has occured - * + * return when all DP cores have responsed on a barrier + */ + void barrier(); + + /** + * a core has reached the event */ - void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id); + void on_core_reporting_in(int event_id, int thread_id); private: - static const int EVENT_ID_INVALID = -1; + TrexDpPortEvent *lookup(int event_id); - TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX]; + static const int EVENT_ID_INVALID = -1; + std::unordered_map m_events; int m_event_id_counter; TrexStatelessPort *m_port; diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index a4522837..9e24802b 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -40,7 +40,7 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { /* create RPC servers */ /* set both servers to mutex each other */ - m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &m_global_cp_lock); + m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_global_lock); m_rpc_server->set_verbose(cfg.m_rpc_server_verbose); /* configure ports */ diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index cc47da6b..6e5e0c44 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -92,18 +92,19 @@ public: TrexStatelessCfg() { m_port_count = 0; m_rpc_req_resp_cfg = NULL; - m_rpc_async_cfg = NULL; m_rpc_server_verbose = false; m_platform_api = NULL; m_publisher = NULL; + m_global_lock = NULL; } const TrexRpcServerConfig *m_rpc_req_resp_cfg; - const TrexRpcServerConfig *m_rpc_async_cfg; const TrexPlatformApi *m_platform_api; bool m_rpc_server_verbose; uint8_t m_port_count; TrexPublisher *m_publisher; + std::mutex *m_global_lock; + }; /** @@ -186,7 +187,6 @@ protected: TrexPublisher *m_publisher; - std::mutex m_global_cp_lock; }; /** diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index c60b0e85..257af546 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -48,11 +48,35 @@ port_id_to_cores(uint8_t port_id, std::vector> &core using namespace std; + + +/*************************** + * trex DP events handlers + * + **************************/ +class AsyncStopEvent : public TrexDpPortEvent { + +protected: + /** + * when an async event occurs (all cores have reported in) + * + * @author imarom (29-Feb-16) + */ + virtual void on_event() { + get_port()->change_state(TrexStatelessPort::PORT_STATE_STREAMS); + + get_port()->common_port_stop_actions(true); + + assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID); + get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID; + } +}; + /*************************** * trex stateless port * **************************/ -TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) { +TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) { std::vector> core_pair_list; m_port_id = port_id; @@ -73,16 +97,20 @@ TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api m_cores_id_list.push_back(core_pair.first); } - /* init the events DP DB */ - m_dp_events.create(this); - m_graph_obj = NULL; + + m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID; } TrexStatelessPort::~TrexStatelessPort() { if (m_graph_obj) { delete m_graph_obj; } + + if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) { + m_dp_events.destroy_event(m_pending_async_stop_event); + m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID; + } } /** @@ -170,16 +198,14 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, } /* generate a message to all the relevant DP cores to start transmitting */ - int event_id = m_dp_events.generate_event_id(); - - /* mark that DP event of stoppped is possible */ - m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id); - + assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID); + m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent()); /* update object status */ m_factor = factor; m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues(); m_last_duration = duration; + change_state(PORT_STATE_TX); /* update the DP - messages will be freed by the DP */ @@ -188,17 +214,23 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, /* was the core assigned a compiled object ? */ if (compiled_objs[index]) { - TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration); + TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, + m_pending_async_stop_event, + compiled_objs[index], + duration); send_message_to_dp(core_id, start_msg); } else { /* mimic an end event */ - m_dp_events.handle_event(TrexDpPortEvent::EVENT_STOP, core_id, event_id); + m_dp_events.on_core_reporting_in(m_pending_async_stop_event, core_id); } index++; } + /* for debug - this can be turn on */ + //m_dp_events.barrier(); + /* update subscribers */ Json::Value data; data["port_id"] = m_port_id; @@ -225,17 +257,23 @@ TrexStatelessPort::stop_traffic(void) { /* delete any previous graphs */ delete_streams_graph(); - /* mask out the DP stop event */ - m_dp_events.disable(TrexDpPortEvent::EVENT_STOP); + /* to avoid race, first destroy any previous stop/pause events */ + if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) { + m_dp_events.destroy_event(m_pending_async_stop_event); + m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID; + } + /* generate a message to all the relevant DP cores to start transmitting */ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id); - send_message_to_all_dp(stop_msg); - /* continue to general actions */ + /* a barrier - make sure all the DP cores stopped */ + m_dp_events.barrier(); + + change_state(PORT_STATE_STREAMS); + common_port_stop_actions(false); - } /** @@ -243,14 +281,12 @@ TrexStatelessPort::stop_traffic(void) { * */ void -TrexStatelessPort::common_port_stop_actions(bool event_triggered) { +TrexStatelessPort::common_port_stop_actions(bool async) { - change_state(PORT_STATE_STREAMS); - Json::Value data; data["port_id"] = m_port_id; - if (event_triggered) { + if (async) { get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data); } else { get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data); @@ -274,17 +310,18 @@ TrexStatelessPort::pause_traffic(void) { throw TrexException(" pause is supported when duration is not enable is start command "); } + /* send a pause message */ TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id); - send_message_to_all_dp(pause_msg); - change_state(PORT_STATE_PAUSE); + /* make sure all DP cores paused */ + m_dp_events.barrier(); - Json::Value data; - data["port_id"] = m_port_id; - get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data); + /* change state */ + change_state(PORT_STATE_PAUSE); } + void TrexStatelessPort::resume_traffic(void) { @@ -294,7 +331,6 @@ TrexStatelessPort::resume_traffic(void) { TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id); send_message_to_all_dp(resume_msg); - change_state(PORT_STATE_TX); @@ -335,7 +371,6 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) { } TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor); - send_message_to_all_dp(update_msg); m_factor *= factor; @@ -439,25 +474,6 @@ TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBas ring->Enqueue((CGenNode *)msg); } -/** - * when a DP (async) event occurs - handle it - * - */ -void -TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) { - Json::Value data; - - switch (event_type) { - - case TrexDpPortEvent::EVENT_STOP: - common_port_stop_actions(true); - break; - - default: - assert(0); - - } -} uint64_t TrexStatelessPort::get_port_speed_bps() const { diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 192d0d19..d3c4dcb9 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -102,13 +102,17 @@ private: }; +class AsyncStopEvent; + /** * describes a stateless port * * @author imarom (31-Aug-15) */ class TrexStatelessPort { - friend class TrexDpPortEvent; + friend TrexDpPortEvents; + friend TrexDpPortEvent; + friend AsyncStopEvent; public: @@ -363,18 +367,12 @@ private: */ void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg); - /** - * triggered when event occurs - * - */ - void on_dp_event_occured(TrexDpPortEvent::event_e event_type); - /** * when a port stops, perform various actions * */ - void common_port_stop_actions(bool event_triggered); + void common_port_stop_actions(bool async); /** * calculate effective M per core @@ -421,6 +419,8 @@ private: /* owner information */ TrexPortOwner m_owner; + + int m_pending_async_stop_event; }; diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 0f578b99..19eface1 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -265,9 +265,9 @@ bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){ } -bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id, - bool stop_on_id, - int event_id){ +bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id){ if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) { @@ -829,9 +829,9 @@ TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) { void -TrexStatelessDpCore::stop_traffic(uint8_t port_id, - bool stop_on_id, - int event_id) { +TrexStatelessDpCore::stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id) { /* we cannot remove nodes not from the top of the queue so for every active node - make sure next time the scheduler invokes it, it will be free */ @@ -843,20 +843,19 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id, //printf(" skip .. %f\n",m_core->m_cur_time_sec); return; } - -#if 0 - if ( are_all_ports_idle() ) { - /* just a place holder if we will need to do somthing in that case */ - } -#endif /* inform the control plane we stopped - this might be a async stop (streams ended) - */ + */ + #if 0 + if ( are_all_ports_idle() ) { + /* just a place holder if we will need to do somthing in that case */ + } + #endif + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id); TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id, port_id, - TrexDpPortEvent::EVENT_STOP, lp_port->get_event_id()); ring->Enqueue((CGenNode *)event_msg); @@ -872,3 +871,12 @@ TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) { delete msg; } +void +TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) { + + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id); + TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id, + port_id, + event_id); + ring->Enqueue((CGenNode *)event_msg); +} diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index 3d214655..cb102b8d 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -235,7 +235,8 @@ public: return (&m_ports[local_port_id]); } - + /* simply sends a message back (acts as a barrier for previous messages) */ + void barrier(uint8_t port_id, int event_id); private: diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 257de168..333aec88 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -180,11 +180,29 @@ TrexStatelessDpUpdate::clone() { return new_msg; } +/************************* + barrier message + ************************/ + +bool +TrexStatelessDpBarrier::handle(TrexStatelessDpCore *dp_core) { + dp_core->barrier(m_port_id, m_event_id); + return true; +} + +TrexStatelessCpToDpMsgBase * +TrexStatelessDpBarrier::clone() { + + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpBarrier(m_port_id, m_event_id); + + return new_msg; +} + /************************* messages from DP to CP **********************/ bool TrexDpPortEventMsg::handle() { TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id); - port->get_dp_events().handle_event(m_event_type, m_thread_id, m_event_id); + port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id); return (true); } diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index d56596bf..dda086b7 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -145,7 +145,7 @@ public: TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) { m_stop_only_for_event_id=false; - m_event_id=0; + m_event_id = 0; m_core = NULL; } @@ -245,6 +245,26 @@ private: double m_factor; }; +/** + * barrier message for DP core + * + */ +class TrexStatelessDpBarrier : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpBarrier(uint8_t port_id, int event_id) { + m_port_id = port_id; + m_event_id = event_id; + } + + virtual bool handle(TrexStatelessDpCore *dp_core); + + virtual TrexStatelessCpToDpMsgBase * clone(); + +private: + uint8_t m_port_id; + int m_event_id; +}; /************************* messages from DP to CP **********************/ @@ -282,10 +302,9 @@ public: class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase { public: - TrexDpPortEventMsg(int thread_id, uint8_t port_id, TrexDpPortEvent::event_e type, int event_id) { + TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id) { m_thread_id = thread_id; m_port_id = port_id; - m_event_type = type; m_event_id = event_id; } @@ -299,10 +318,6 @@ public: return m_port_id; } - TrexDpPortEvent::event_e get_event_type() { - return m_event_type; - } - int get_event_id() { return m_event_id; } @@ -310,7 +325,6 @@ public: private: int m_thread_id; uint8_t m_port_id; - TrexDpPortEvent::event_e m_event_type; int m_event_id; }; -- cgit 1.2.3-korg