From 07e6795a7497151e0920c82337cca6cfb5c3c3cd Mon Sep 17 00:00:00 2001 From: imarom Date: Tue, 17 Nov 2015 15:26:41 +0200 Subject: checkpoint before merge --- src/stateless/dp/trex_stateless_dp_core.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'src/stateless') diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index eabd6fdb..07e03678 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -248,7 +248,13 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id) { m_core->m_node_gen.add_node(node); } - + + /* send a message to the control plane to + generate an async event that traffic has stopped + */ + //CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id); + //ring->Enqueue((CGenNode *)msg->clone()); + } /** -- cgit From a7317d45787669af71ca8c65fd1e51f8a47d2c1e Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 19 Nov 2015 12:35:16 +0200 Subject: async events (DP to CP) --- src/stateless/cp/trex_stateless.cpp | 2 +- src/stateless/cp/trex_stateless_port.cpp | 56 +++++++++++++--- src/stateless/cp/trex_stateless_port.h | 38 +++++++++-- src/stateless/cp/trex_streams_compiler.h | 4 ++ src/stateless/dp/trex_stateless_dp_core.cpp | 13 ++-- src/stateless/dp/trex_stateless_dp_core.h | 9 +++ .../messaging/trex_stateless_messaging.cpp | 23 ++++++- src/stateless/messaging/trex_stateless_messaging.h | 76 ++++++++++++++++++---- 8 files changed, 188 insertions(+), 33 deletions(-) (limited to 'src/stateless') diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index e0e95450..6ef24a7b 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -47,7 +47,7 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { m_port_count = cfg.m_port_count; for (int i = 0; i < m_port_count; i++) { - m_ports.push_back(new TrexStatelessPort(i)); + m_ports.push_back(new TrexStatelessPort(i, cfg.m_platform_api)); } m_platform_api = cfg.m_platform_api; diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index cbc5a328..13d0fc9f 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -52,9 +52,25 @@ using namespace std; * trex stateless port * **************************/ -TrexStatelessPort::TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) { +TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) { + std::vector> core_pair_list; + + m_port_id = port_id; + m_port_state = PORT_STATE_IDLE; clear_owner(); + + /* get the DP cores belonging to this port */ + api->port_id_to_cores(m_port_id, core_pair_list); + + for (auto core_pair : core_pair_list) { + + /* send the core id */ + m_cores_id_list.push_back(core_pair.first); + } + + /* init the events DP DB */ + m_dp_events.create(this); } @@ -105,11 +121,16 @@ TrexStatelessPort::start_traffic(double mul, double duration) { } /* generate a message to all the relevant DP cores to start transmitting */ - TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(compiled_obj, duration); + m_event_id = m_dp_events.generate_event_id(); + TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_event_id, compiled_obj, duration); + + change_state(PORT_STATE_TX); send_message_to_dp(start_msg); - change_state(PORT_STATE_TX); + /* mark that DP event of stoppped is possible */ + m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, m_event_id); + } /** @@ -279,15 +300,32 @@ TrexStatelessPort::encode_stats(Json::Value &port) { void TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) { - std::vector> cores_id_list; - - get_stateless_obj()->get_platform_api()->port_id_to_cores(m_port_id, cores_id_list); - - for (auto core_pair : cores_id_list) { + for (auto core_id : m_cores_id_list) { /* send the message to the core */ - CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_pair.first); + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id); ring->Enqueue((CGenNode *)msg->clone()); } } + +/** + * when a DP (async) event occurs - handle it + * + */ +void +TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) { + switch (event_type) { + + case TrexDpPortEvent::EVENT_STOP: + /* set a stop event */ + change_state(PORT_STATE_STREAMS); + /* send a ZMQ event */ + break; + + default: + assert(0); + + } + printf("hey"); +} diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index b533f793..da75284e 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -22,7 +22,9 @@ limitations under the License. #define __TREX_STATELESS_PORT_H__ #include +#include +class TrexPlatformApi; class TrexStatelessCpToDpMsgBase; /** @@ -31,6 +33,8 @@ class TrexStatelessCpToDpMsgBase; * @author imarom (31-Aug-15) */ class TrexStatelessPort { + friend class TrexDpPortEvent; + public: /** @@ -54,7 +58,7 @@ public: RC_ERR_FAILED_TO_COMPILE_STREAMS }; - TrexStatelessPort(uint8_t port_id); + TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api); /** * acquire port @@ -199,6 +203,10 @@ public: m_stream_table.get_object_list(object_list); } + TrexDpPortEvents & get_dp_events() { + return m_dp_events; + } + private: @@ -224,6 +232,10 @@ private: } + const std::vector get_core_id_list () { + return m_cores_id_list; + } + bool verify_state(int state, bool should_throw = true) const; void change_state(port_state_e new_state); @@ -232,11 +244,25 @@ private: void send_message_to_dp(TrexStatelessCpToDpMsgBase *msg); - TrexStreamTable m_stream_table; - uint8_t m_port_id; - port_state_e m_port_state; - std::string m_owner; - std::string m_owner_handler; + /** + * triggered when event occurs + * + */ + void on_dp_event_occured(TrexDpPortEvent::event_e event_type); + + + TrexStreamTable m_stream_table; + uint8_t m_port_id; + port_state_e m_port_state; + std::string m_owner; + std::string m_owner_handler; + + /* holds the DP cores associated with this port */ + //std::vector> m_cores_id_list; + std::vector m_cores_id_list; + + TrexDpPortEvents m_dp_events; + int m_event_id; }; #endif /* __TREX_STATELESS_PORT_H__ */ diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h index 42cfc5b8..c80dddef 100644 --- a/src/stateless/cp/trex_streams_compiler.h +++ b/src/stateless/cp/trex_streams_compiler.h @@ -60,6 +60,10 @@ public: return (m_mul); } + uint8_t get_port_id() { + return m_port_id; + } + private: void add_compiled_stream(TrexStream * stream); std::vector m_objs; diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 0747c1a0..4a74d9e5 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -227,6 +227,7 @@ TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, double duration) if ( duration > 0.0 ){ add_duration( duration ); } + } void @@ -261,11 +262,15 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id) { m_core->m_node_gen.add_node(node); } - /* send a message to the control plane to - generate an async event that traffic has stopped + /* inform the control plane we stopped - this might be a async stop + (streams ended) */ - //CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id); - //ring->Enqueue((CGenNode *)msg->clone()); + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id); + TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id, + port_id, + TrexDpPortEvent::EVENT_STOP, + get_event_id()); + ring->Enqueue((CGenNode *)event_msg); } diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index aaa6eed3..d07e1d3a 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -112,6 +112,14 @@ public: /* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */ void quit_main_loop(); + void set_event_id(int event_id) { + m_event_id = event_id; + } + + int get_event_id() { + return m_event_id; + } + private: /** * in idle state loop, the processor most of the time sleeps @@ -152,6 +160,7 @@ private: CFlowGenListPerThread *m_core; double m_duration; + int m_event_id; }; #endif /* __TREX_STATELESS_DP_CORE_H__ */ diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 2e3acffd..a2d00f8b 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -21,12 +21,17 @@ limitations under the License. #include #include #include +#include + #include /************************* start traffic message ************************/ -TrexStatelessDpStart::TrexStatelessDpStart(TrexStreamsCompiledObj *obj, double duration) : m_obj(obj), m_duration(duration) { +TrexStatelessDpStart::TrexStatelessDpStart(int event_id, TrexStreamsCompiledObj *obj, double duration) { + m_event_id = event_id; + m_obj = obj; + m_duration = duration; } @@ -39,7 +44,7 @@ TrexStatelessDpStart::clone() { TrexStreamsCompiledObj *new_obj = m_obj->clone(); - TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStart(new_obj, m_duration); + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStart(m_event_id, new_obj, m_duration); return new_msg; } @@ -53,7 +58,12 @@ TrexStatelessDpStart::~TrexStatelessDpStart() { bool TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { + /* mark the event id for DP response */ + dp_core->set_event_id(m_event_id); + + /* staet traffic */ dp_core->start_traffic(m_obj, m_duration); + return true; } @@ -96,3 +106,12 @@ bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){ return (true); } + +/************************* messages from DP to CP **********************/ +bool +TrexDpPortEventMsg::handle() { + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id); + port->get_dp_events().handle_event(m_event_type, m_thread_id, m_event_id); + + return (true); +} diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 6473a6a4..3fb1ef84 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -23,6 +23,7 @@ limitations under the License. #define __TREX_STATELESS_MESSAGING_H__ #include +#include class TrexStatelessDpCore; class TrexStreamsCompiledObj; @@ -41,12 +42,8 @@ public: virtual ~TrexStatelessCpToDpMsgBase() { } - /** - * virtual function to handle a message - * - */ - virtual bool handle(TrexStatelessDpCore *dp_core) = 0; + virtual bool handle(TrexStatelessDpCore *dp_core) = 0; /** * clone the current message @@ -57,6 +54,8 @@ public: /* no copy constructor */ TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete; +protected: + int m_event_id; }; /** @@ -67,16 +66,17 @@ public: class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase { public: - TrexStatelessDpStart(TrexStreamsCompiledObj *obj, double duration); + TrexStatelessDpStart(int m_event_id, TrexStreamsCompiledObj *obj, double duration); ~TrexStatelessDpStart(); - virtual bool handle(TrexStatelessDpCore *dp_core); - virtual TrexStatelessCpToDpMsgBase * clone(); + virtual bool handle(TrexStatelessDpCore *dp_core); private: + + int m_event_id; TrexStreamsCompiledObj *m_obj; double m_duration; }; @@ -92,10 +92,10 @@ public: TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) { } - virtual bool handle(TrexStatelessDpCore *dp_core); - virtual TrexStatelessCpToDpMsgBase * clone(); + virtual bool handle(TrexStatelessDpCore *dp_core); + private: uint8_t m_port_id; }; @@ -111,10 +111,64 @@ public: TrexStatelessDpQuit() { } - virtual bool handle(TrexStatelessDpCore *dp_core); virtual TrexStatelessCpToDpMsgBase * clone(); + + virtual bool handle(TrexStatelessDpCore *dp_core); + +}; + +/************************* messages from DP to CP **********************/ + +/** + * defines the base class for CP to DP messages + * + * @author imarom (27-Oct-15) + */ +class TrexStatelessDpToCpMsgBase { +public: + + TrexStatelessDpToCpMsgBase() { + } + + virtual ~TrexStatelessDpToCpMsgBase() { + } + + /** + * virtual function to handle a message + * + */ + virtual bool handle() = 0; + + /* no copy constructor */ + TrexStatelessDpToCpMsgBase(TrexStatelessDpToCpMsgBase &) = delete; + }; +/** + * a message indicating an event has happened on a port at the + * DP + * + */ +class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase { +public: + + TrexDpPortEventMsg(int thread_id, uint8_t port_id, TrexDpPortEvent::event_e type, int event_id) { + m_thread_id = thread_id; + m_port_id = port_id; + m_event_type = type; + m_event_id = event_id; + } + + virtual bool handle(); + +private: + int m_thread_id; + uint8_t m_port_id; + TrexDpPortEvent::event_e m_event_type; + int m_event_id; + +}; #endif /* __TREX_STATELESS_MESSAGING_H__ */ + -- cgit From 2ae2e4e860194ee8d2b5ec5c4a1375751f51dd98 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 19 Nov 2015 17:17:38 +0200 Subject: full async DP stop support --- src/stateless/cp/trex_stateless.cpp | 2 ++ src/stateless/cp/trex_stateless.h | 9 +++++++++ src/stateless/cp/trex_stateless_port.cpp | 6 +++++- src/stateless/messaging/trex_stateless_messaging.h | 16 ++++++++++++++++ 4 files changed, 32 insertions(+), 1 deletion(-) (limited to 'src/stateless') diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 6ef24a7b..a4522837 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -51,6 +51,8 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { } m_platform_api = cfg.m_platform_api; + m_publisher = cfg.m_publisher; + } /** diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 57c6ef1d..5c11be1e 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -30,6 +30,7 @@ limitations under the License. #include #include #include +#include #include @@ -93,6 +94,7 @@ public: m_rpc_async_cfg = NULL; m_rpc_server_verbose = false; m_platform_api = NULL; + m_publisher = NULL; } const TrexRpcServerConfig *m_rpc_req_resp_cfg; @@ -100,6 +102,7 @@ public: const TrexPlatformApi *m_platform_api; bool m_rpc_server_verbose; uint8_t m_port_count; + TrexPublisher *m_publisher; }; /** @@ -150,6 +153,10 @@ public: return (m_platform_api); } + TrexPublisher * get_publisher() { + return m_publisher; + } + const std::vector get_port_list() { return m_ports; } @@ -170,6 +177,8 @@ protected: /* platform API */ const TrexPlatformApi *m_platform_api; + TrexPublisher *m_publisher; + std::mutex m_global_cp_lock; }; diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 13d0fc9f..489e2172 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -315,17 +315,21 @@ TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) { */ void TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) { + Json::Value data; + switch (event_type) { case TrexDpPortEvent::EVENT_STOP: /* set a stop event */ change_state(PORT_STATE_STREAMS); /* send a ZMQ event */ + + data["port_id"] = m_port_id; + get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data); break; default: assert(0); } - printf("hey"); } diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 445e9378..ffb36124 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -192,6 +192,22 @@ public: virtual bool handle(); + int get_thread_id() { + return m_thread_id; + } + + uint8_t get_port_id() { + return m_port_id; + } + + TrexDpPortEvent::event_e get_event_type() { + return m_event_type; + } + + int get_event_id() { + return m_event_id; + } + private: int m_thread_id; uint8_t m_port_id; -- cgit From 34191e258a032cf78a171752eece903bb06f6751 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 19 Nov 2015 17:47:42 +0200 Subject: disable stop event when you got it as a sync request --- src/stateless/cp/trex_dp_port_events.cpp | 5 +++++ src/stateless/cp/trex_dp_port_events.h | 6 ++++++ src/stateless/cp/trex_stateless_port.cpp | 3 +++ 3 files changed, 14 insertions(+) (limited to 'src/stateless') diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp index 533ab605..f74068a3 100644 --- a/src/stateless/cp/trex_dp_port_events.cpp +++ b/src/stateless/cp/trex_dp_port_events.cpp @@ -64,6 +64,11 @@ TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int m_events[ev].wait_for_event(event_id, timeout_ms); } +void +TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) { + m_events[ev].disable(); +} + /** * handle an event * diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h index 309288df..557e590b 100644 --- a/src/stateless/cp/trex_dp_port_events.h +++ b/src/stateless/cp/trex_dp_port_events.h @@ -146,6 +146,12 @@ public: */ void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1); + /** + * disable an event (don't care) + * + */ + void disable(TrexDpPortEvent::event_e ev); + /** * event has occured * diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 489e2172..43960d5e 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -153,6 +153,9 @@ TrexStatelessPort::stop_traffic(void) { send_message_to_dp(stop_msg); change_state(PORT_STATE_STREAMS); + + /* mask out the DP stop event */ + m_dp_events.disable(TrexDpPortEvent::EVENT_STOP); } void -- cgit From 90e283923e5d860803eae5996a1247b1390ea36a Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 19 Nov 2015 18:14:58 +0200 Subject: a bug in back to back start/stop --- src/stateless/cp/trex_dp_port_events.cpp | 4 ++-- src/stateless/cp/trex_stateless_port.cpp | 13 +++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) (limited to 'src/stateless') diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp index f74068a3..ba327e59 100644 --- a/src/stateless/cp/trex_dp_port_events.cpp +++ b/src/stateless/cp/trex_dp_port_events.cpp @@ -175,9 +175,9 @@ TrexDpPortEvent::handle_event(int thread_id, int event_id) { return; } - /* check the event id is matching the required event */ + /* check the event id is matching the required event - if not maybe its an old signal */ if (event_id != m_event_id) { - err(thread_id, event_id, "event key mismatch"); + return; } /* mark sure no double signal */ diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 43960d5e..795c0f12 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -122,14 +122,14 @@ TrexStatelessPort::start_traffic(double mul, double duration) { /* generate a message to all the relevant DP cores to start transmitting */ m_event_id = m_dp_events.generate_event_id(); + /* mark that DP event of stoppped is possible */ + m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, m_event_id); + TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_event_id, compiled_obj, duration); change_state(PORT_STATE_TX); send_message_to_dp(start_msg); - - /* mark that DP event of stoppped is possible */ - m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, m_event_id); } @@ -147,15 +147,16 @@ TrexStatelessPort::stop_traffic(void) { return; } + /* mask out the DP stop event */ + m_dp_events.disable(TrexDpPortEvent::EVENT_STOP); + /* generate a message to all the relevant DP cores to start transmitting */ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id); send_message_to_dp(stop_msg); change_state(PORT_STATE_STREAMS); - - /* mask out the DP stop event */ - m_dp_events.disable(TrexDpPortEvent::EVENT_STOP); + } void -- cgit From b094110ef86889a0694dc71503c5610abaf47ebe Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 19 Nov 2015 22:13:55 +0200 Subject: BUG: didn't do all the states per port on the DP - now fixed --- src/stateless/cp/trex_stateless_port.cpp | 6 ++--- src/stateless/cp/trex_stateless_port.h | 2 -- src/stateless/dp/trex_stateless_dp_core.cpp | 2 +- src/stateless/dp/trex_stateless_dp_core.h | 27 ++++++++++++++-------- .../messaging/trex_stateless_messaging.cpp | 11 ++++++--- src/stateless/messaging/trex_stateless_messaging.h | 8 ++++--- 6 files changed, 35 insertions(+), 21 deletions(-) (limited to 'src/stateless') diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 795c0f12..fbc5f7c7 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -121,11 +121,11 @@ TrexStatelessPort::start_traffic(double mul, double duration) { } /* generate a message to all the relevant DP cores to start transmitting */ - m_event_id = m_dp_events.generate_event_id(); + int event_id = m_dp_events.generate_event_id(); /* mark that DP event of stoppped is possible */ - m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, m_event_id); + m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id); - TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_event_id, compiled_obj, duration); + TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_obj, duration); change_state(PORT_STATE_TX); diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index da75284e..73157c15 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -258,11 +258,9 @@ private: std::string m_owner_handler; /* holds the DP cores associated with this port */ - //std::vector> m_cores_id_list; std::vector m_cores_id_list; TrexDpPortEvents m_dp_events; - int m_event_id; }; #endif /* __TREX_STATELESS_PORT_H__ */ diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index cde34a4b..6db66661 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -525,7 +525,7 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id) { TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id, port_id, TrexDpPortEvent::EVENT_STOP, - get_event_id()); + lp_port->get_event_id()); ring->Enqueue((CGenNode *)event_msg); } diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index 7ee5abc4..c0bbe702 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -66,6 +66,18 @@ public: bool update_number_of_active_streams(uint32_t d); + state_e get_state() { + return m_state; + } + + void set_event_id(int event_id) { + m_event_id = event_id; + } + + int get_event_id() { + return m_event_id; + } + public: state_e m_state; @@ -75,6 +87,7 @@ public: std::vector m_active_nodes; /* holds the current active nodes */ CFlowGenListPerThread * m_core ; + int m_event_id; }; /* for now */ @@ -166,19 +179,13 @@ public: /* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */ void quit_main_loop(); - void set_event_id(int event_id) { - m_event_id = event_id; - } - - int get_event_id() { - return m_event_id; + state_e get_state() { + return m_state; } bool set_stateless_next_node(CGenNodeStateless * cur_node, CGenNodeStateless * next_node); -private: - TrexStatelessDpPerPort * get_port_db(uint8_t port_id){ assert((m_local_port_offset==port_id) ||(m_local_port_offset+1==port_id)); @@ -188,6 +195,9 @@ private: } + +private: + void schedule_exit(); @@ -236,7 +246,6 @@ private: CFlowGenListPerThread * m_core; double m_duration; - int m_event_id; }; #endif /* __TREX_STATELESS_DP_CORE_H__ */ diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index c92ad68a..629fe24c 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -29,7 +29,8 @@ limitations under the License. /************************* start traffic message ************************/ -TrexStatelessDpStart::TrexStatelessDpStart(int event_id, TrexStreamsCompiledObj *obj, double duration) { +TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexStreamsCompiledObj *obj, double duration) { + m_port_id = port_id; m_event_id = event_id; m_obj = obj; m_duration = duration; @@ -45,7 +46,7 @@ TrexStatelessDpStart::clone() { TrexStreamsCompiledObj *new_obj = m_obj->clone(); - TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStart(m_event_id, new_obj, m_duration); + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStart(m_port_id, m_event_id, new_obj, m_duration); return new_msg; } @@ -60,7 +61,7 @@ bool TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { /* mark the event id for DP response */ - dp_core->set_event_id(m_event_id); + dp_core->get_port_db(m_port_id)->set_event_id(m_event_id); /* staet traffic */ dp_core->start_traffic(m_obj, m_duration); @@ -73,6 +74,10 @@ TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { ************************/ bool TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) { + if (dp_core->get_port_db(m_port_id)->get_state() == TrexStatelessDpPerPort::ppSTATE_IDLE) { + return true; + } + dp_core->stop_traffic(m_port_id); return true; } diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index ffb36124..2fb5a024 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -77,7 +77,7 @@ protected: class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase { public: - TrexStatelessDpStart(int m_event_id, TrexStreamsCompiledObj *obj, double duration); + TrexStatelessDpStart(uint8_t m_port_id, int m_event_id, TrexStreamsCompiledObj *obj, double duration); ~TrexStatelessDpStart(); @@ -87,9 +87,11 @@ public: private: - int m_event_id; + uint8_t m_port_id; + int m_event_id; TrexStreamsCompiledObj *m_obj; - double m_duration; + double m_duration; + }; /** -- cgit