summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xlinux/ws_main.py1
-rwxr-xr-xlinux_dpdk/ws_main.py2
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py2
-rw-r--r--src/gtest/trex_stateless_gtest.cpp16
-rwxr-xr-xsrc/main_dpdk.cpp148
-rw-r--r--src/publisher/trex_publisher.cpp107
-rw-r--r--src/publisher/trex_publisher.h54
-rw-r--r--src/stateless/cp/trex_dp_port_events.cpp215
-rw-r--r--src/stateless/cp/trex_dp_port_events.h165
-rw-r--r--src/stateless/cp/trex_stateless.cpp2
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp56
-rw-r--r--src/stateless/cp/trex_stateless_port.h38
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp12
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h9
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp24
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h80
16 files changed, 810 insertions, 121 deletions
diff --git a/linux/ws_main.py b/linux/ws_main.py
index 65ca4522..0bd61f70 100755
--- a/linux/ws_main.py
+++ b/linux/ws_main.py
@@ -152,6 +152,7 @@ stateless_src = SrcGroup(dir='src/stateless/',
'cp/trex_stateless.cpp',
'cp/trex_stateless_port.cpp',
'cp/trex_streams_compiler.cpp',
+ 'cp/trex_dp_port_events.cpp',
'dp/trex_stateless_dp_core.cpp',
'messaging/trex_stateless_messaging.cpp',
])
diff --git a/linux_dpdk/ws_main.py b/linux_dpdk/ws_main.py
index 6591a241..f7acd49d 100755
--- a/linux_dpdk/ws_main.py
+++ b/linux_dpdk/ws_main.py
@@ -107,6 +107,7 @@ main_src = SrcGroup(dir='src',
'utl_yaml.cpp',
'nat_check.cpp',
'msg_manager.cpp',
+ 'publisher/trex_publisher.cpp',
'pal/linux_dpdk/pal_utl.cpp',
'pal/linux_dpdk/mbuf.cpp'
]);
@@ -159,6 +160,7 @@ stateless_src = SrcGroup(dir='src/stateless/',
'cp/trex_stateless.cpp',
'cp/trex_stateless_port.cpp',
'cp/trex_streams_compiler.cpp',
+ 'cp/trex_dp_port_events.cpp',
'dp/trex_stateless_dp_core.cpp',
'messaging/trex_stateless_messaging.cpp'
])
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 3dcfae28..699f0af2 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -790,7 +790,7 @@ class CTRexStatelessClient(object):
if opts is None:
return RC_ERR("bad command line paramters")
- delay_sec = opts.d if opts.d else 1
+ delay_sec = opts.duration if (opts.duration > 0) else 1
print format_text("Waiting for {0} seconds...\n".format(delay_sec), 'bold')
time.sleep(delay_sec)
diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp
index ac0a5e63..432c7382 100644
--- a/src/gtest/trex_stateless_gtest.cpp
+++ b/src/gtest/trex_stateless_gtest.cpp
@@ -362,7 +362,7 @@ TEST_F(basic_stl, simple_prog3) {
EXPECT_TRUE(compile.compile(streams, comp_obj) );
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 50.0 );
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 50.0 );
t1.m_msg = lpstart;
@@ -424,7 +424,7 @@ TEST_F(basic_stl, simple_prog2) {
EXPECT_TRUE(compile.compile(streams, comp_obj) );
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10.0 );
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 10.0 );
t1.m_msg = lpstart;
@@ -486,7 +486,7 @@ TEST_F(basic_stl, simple_prog1) {
EXPECT_TRUE(compile.compile(streams, comp_obj) );
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10.0 );
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 10.0 );
t1.m_msg = lpstart;
@@ -531,7 +531,7 @@ TEST_F(basic_stl, single_pkt_burst1) {
assert(compile.compile(streams, comp_obj) );
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10.0 );
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 10.0 );
t1.m_msg = lpstart;
@@ -582,7 +582,7 @@ TEST_F(basic_stl, single_pkt) {
assert(compile.compile(streams, comp_obj) );
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10.0 /*sec */ );
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 10.0 /*sec */ );
t1.m_msg = lpstart;
@@ -639,7 +639,7 @@ TEST_F(basic_stl, multi_pkt1) {
assert(compile.compile(streams, comp_obj) );
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10 );
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 10 );
t1.m_msg = lpstart;
@@ -702,7 +702,7 @@ TEST_F(basic_stl, multi_pkt2) {
assert(compile.compile(streams, comp_obj) );
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10 );
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 10 );
t1.m_msg = lpstart;
@@ -748,7 +748,7 @@ TEST_F(basic_stl, multi_burst1) {
assert(compile.compile(streams, comp_obj) );
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 40 );
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 40 );
t1.m_msg = lpstart;
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 865c84ed..6c92172c 100755
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -58,6 +58,7 @@ limitations under the License.
#include <stateless/cp/trex_stateless.h>
#include <stateless/dp/trex_stream_node.h>
+#include <publisher/trex_publisher.h>
#include <stateless/messaging/trex_stateless_messaging.h>
#include <../linux_dpdk/version.h>
@@ -2398,71 +2399,6 @@ private:
};
-class CZMqPublisher {
-public:
- CZMqPublisher(){
- m_context=0;
- m_publisher=0;
- }
-
- bool Create(uint16_t port,bool disable);
- void Delete();
- void publish_json(std::string & s);
-private:
- void show_zmq_last_error(char *s);
-private:
- void * m_context;
- void * m_publisher;
-};
-
-void CZMqPublisher::show_zmq_last_error(char *s){
- printf(" ERROR %s \n",s);
- printf(" ZMQ: %s",zmq_strerror (zmq_errno ()));
- exit(-1);
-}
-
-
-bool CZMqPublisher::Create(uint16_t port,bool disable){
-
- if (disable) {
- return(true);
- }
- m_context = zmq_ctx_new ();
- if ( m_context == 0 ) {
- show_zmq_last_error((char *)"can't connect to ZMQ library");
- }
- m_publisher = zmq_socket (m_context, ZMQ_PUB);
- if ( m_context == 0 ) {
- show_zmq_last_error((char *)"can't create ZMQ socket");
- }
- char buffer[100];
- sprintf(buffer,"tcp://*:%d",port);
- int rc=zmq_bind (m_publisher, buffer);
- if (rc != 0 ) {
- sprintf(buffer,"can't bind to ZMQ socket %d",port);
- show_zmq_last_error(buffer);
- }
- printf("zmq publisher at: %s \n",buffer);
- return (true);
-}
-
-
-void CZMqPublisher::Delete(){
- if (m_publisher) {
- zmq_close (m_publisher);
- }
- if (m_context) {
- zmq_ctx_destroy (m_context);
- }
-}
-
-
-void CZMqPublisher::publish_json(std::string & s){
- if ( m_publisher ){
- int size = zmq_send (m_publisher, s.c_str(), s.length(), 0);
- assert(size==s.length());
- }
-}
class CPerPortStats {
public:
@@ -2825,6 +2761,10 @@ private:
void try_stop_all_dp();
/* send message to all dp cores */
int send_message_all_dp(TrexStatelessCpToDpMsgBase *msg);
+
+ void check_for_dp_message_from_core(int thread_id);
+ void check_for_dp_messages();
+
public:
int start_send_master();
@@ -2965,7 +2905,7 @@ private:
CLatencyVmPort m_latency_vm_vports[BP_MAX_PORTS]; /* vm driver */
CLatencyPktInfo m_latency_pkt;
- CZMqPublisher m_zmq_publisher;
+ TrexPublisher m_zmq_publisher;
public:
TrexStateless *m_trex_stateless;
@@ -3233,6 +3173,46 @@ int CGlobalTRex::reset_counters(){
return (0);
}
+/**
+ * check for a single core
+ *
+ * @author imarom (19-Nov-15)
+ *
+ * @param thread_id
+ */
+void
+CGlobalTRex::check_for_dp_message_from_core(int thread_id) {
+
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(thread_id);
+
+ /* fast path check */
+ if ( likely ( ring->isEmpty() ) ) {
+ return;
+ }
+
+ while ( true ) {
+ CGenNode * node = NULL;
+ if (ring->Dequeue(node) != 0) {
+ break;
+ }
+ assert(node);
+
+ TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node;
+ msg->handle();
+ }
+
+}
+
+
+void
+CGlobalTRex::check_for_dp_messages() {
+ /* for all the cores - check for a new message */
+ for (int i = 0; i < get_cores_tx(); i++) {
+ check_for_dp_message_from_core(i);
+ }
+
+
+}
bool CGlobalTRex::is_all_links_are_up(bool dump){
bool all_link_are=true;
@@ -3495,21 +3475,7 @@ int CGlobalTRex::ixgbe_start(void){
bool CGlobalTRex::Create(){
CFlowsYamlInfo pre_yaml_info;
- if (get_is_stateless()) {
-
- TrexStatelessCfg cfg;
-
- TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port);
-
- cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd;
- cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg;
- cfg.m_rpc_async_cfg = NULL;
- cfg.m_rpc_server_verbose = false;
- cfg.m_platform_api = new TrexDpdkPlatformApi();
-
- m_trex_stateless = new TrexStateless(cfg);
-
- } else {
+ if (!get_is_stateless()) {
pre_yaml_info.load_from_yaml_file(CGlobalInfo::m_options.cfg_file);
}
@@ -3553,6 +3519,23 @@ bool CGlobalTRex::Create(){
CGlobalInfo::init_pools(rx_mbuf);
ixgbe_start();
dump_config(stdout);
+
+ /* start stateless */
+ if (get_is_stateless()) {
+
+ TrexStatelessCfg cfg;
+
+ TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port);
+
+ cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd;
+ cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg;
+ cfg.m_rpc_async_cfg = NULL;
+ cfg.m_rpc_server_verbose = false;
+ cfg.m_platform_api = new TrexDpdkPlatformApi();
+
+ m_trex_stateless = new TrexStateless(cfg);
+ }
+
return (true);
}
@@ -4119,6 +4102,9 @@ int CGlobalTRex::run_in_master(){
m_trex_stateless->generate_publish_snapshot(json);
m_zmq_publisher.publish_json(json);
+ /* check from messages from DP */
+ check_for_dp_messages();
+
delay(500);
if ( is_all_cores_finished() ) {
diff --git a/src/publisher/trex_publisher.cpp b/src/publisher/trex_publisher.cpp
new file mode 100644
index 00000000..1afb558a
--- /dev/null
+++ b/src/publisher/trex_publisher.cpp
@@ -0,0 +1,107 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include "trex_publisher.h"
+#include <zmq.h>
+#include <assert.h>
+#include <sstream>
+#include <iostream>
+
+/**
+ * create the publisher
+ *
+ */
+bool
+TrexPublisher::Create(uint16_t port, bool disable){
+
+ if (disable) {
+ return (true);
+ }
+
+ m_context = zmq_ctx_new();
+ if ( m_context == 0 ) {
+ show_zmq_last_error("can't connect to ZMQ library");
+ }
+
+ m_publisher = zmq_socket (m_context, ZMQ_PUB);
+ if ( m_context == 0 ) {
+ show_zmq_last_error("can't create ZMQ socket");
+ }
+
+ std::stringstream ss;
+ ss << "tcp://*:" << port;
+
+ int rc = zmq_bind (m_publisher, ss.str().c_str());
+ if (rc != 0 ) {
+ show_zmq_last_error("can't bind to ZMQ socket at " + ss.str());
+ }
+
+ std::cout << "zmq publisher at: " << ss.str() << "\n";
+ return (true);
+}
+
+
+void
+TrexPublisher::Delete(){
+ if (m_publisher) {
+ zmq_close (m_publisher);
+ m_publisher = NULL;
+ }
+ if (m_context) {
+ zmq_ctx_destroy (m_context);
+ m_context = NULL;
+ }
+}
+
+
+void
+TrexPublisher::publish_json(const std::string &s){
+ if (m_publisher) {
+ int size = zmq_send (m_publisher, s.c_str(), s.length(), 0);
+ assert(size == s.length());
+ }
+}
+
+void
+TrexPublisher::publish_event(event_type_e type, const Json::Value &data) {
+ Json::FastWriter writer;
+ Json::Value value;
+ std::string s;
+
+ value["name"] = "event";
+ value["type"] = type;
+ value["data"] = data;
+
+ s = writer.write(value);
+ publish_json(s);
+}
+
+/**
+ * error handling
+ *
+ */
+void
+TrexPublisher::show_zmq_last_error(const std::string &err){
+ std::cout << " ERROR " << err << "\n";
+ std::cout << " ZMQ: " << zmq_strerror (zmq_errno ());
+ exit(-1);
+}
+
diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h
new file mode 100644
index 00000000..07d06678
--- /dev/null
+++ b/src/publisher/trex_publisher.h
@@ -0,0 +1,54 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_PUBLISHER_H__
+#define __TREX_PUBLISHER_H__
+
+#include <stdint.h>
+#include <string>
+#include <json/json.h>
+
+class TrexPublisher {
+
+public:
+
+ TrexPublisher() {
+ m_context = NULL;
+ m_publisher = NULL;
+ }
+
+ bool Create(uint16_t port, bool disable);
+ void Delete();
+ void publish_json(const std::string &s);
+
+ enum event_type_e {
+ EVENT_PORT_STOPPED = 1
+ };
+
+ void publish_event(event_type_e type, const Json::Value &data);
+
+private:
+ void show_zmq_last_error(const std::string &err);
+private:
+ void * m_context;
+ void * m_publisher;
+};
+
+#endif /* __TREX_PUBLISHER_H__ */
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp
new file mode 100644
index 00000000..533ab605
--- /dev/null
+++ b/src/stateless/cp/trex_dp_port_events.cpp
@@ -0,0 +1,215 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include <trex_dp_port_events.h>
+#include <sstream>
+#include <os_time.h>
+#include <trex_stateless.h>
+
+/**
+ * port events
+ */
+void
+TrexDpPortEvents::create(TrexStatelessPort *port) {
+ m_port = port;
+
+ for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) {
+ m_events[i].create((TrexDpPortEvent::event_e) i, port);
+ }
+
+ m_event_id_counter = EVENT_ID_INVALID;
+}
+
+/**
+ * generate a new event ID
+ *
+ */
+int
+TrexDpPortEvents::generate_event_id() {
+ return (++m_event_id_counter);
+}
+
+/**
+ * mark the next allowed event
+ * all other events will be disabled
+ *
+ */
+void
+TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) {
+
+ /* first disable all events */
+ for (TrexDpPortEvent & e : m_events) {
+ e.disable();
+ }
+
+ /* mark this event as allowed */
+ m_events[ev].wait_for_event(event_id, timeout_ms);
+}
+
+/**
+ * handle an event
+ *
+ */
+void
+TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) {
+ m_events[ev].handle_event(thread_id, event_id);
+}
+
+/***********
+ * single event object
+ *
+ */
+
+void
+TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) {
+ m_event_type = type;
+ m_port = port;
+
+ /* add the core ids to the hash */
+ m_signal.clear();
+ for (int core_id : m_port->get_core_id_list()) {
+ m_signal[core_id] = false;
+ }
+
+ /* event is disabled */
+ disable();
+}
+
+
+/**
+ * wait the event using event id and timeout
+ *
+ */
+void
+TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) {
+
+ /* set a new event id */
+ m_event_id = event_id;
+
+ /* do we have a timeout ? */
+ if (timeout_ms > 0) {
+ m_expire_limit_ms = os_get_time_msec() + timeout_ms;
+ } else {
+ m_expire_limit_ms = -1;
+ }
+
+ /* prepare the signal array */
+ m_pending_cnt = 0;
+ for (auto & core_pair : m_signal) {
+ core_pair.second = false;
+ m_pending_cnt++;
+ }
+}
+
+void
+TrexDpPortEvent::disable() {
+ m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
+}
+
+/**
+ * get the event status
+ *
+ */
+
+TrexDpPortEvent::event_status_e
+TrexDpPortEvent::status() {
+
+ /* is it even active ? */
+ if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) {
+ return (EVENT_DISABLE);
+ }
+
+ /* did it occured ? */
+ if (m_pending_cnt == 0) {
+ return (EVENT_OCCURED);
+ }
+
+ /* so we are enabled and the event did not occur - maybe we timed out ? */
+ if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) {
+ return (EVENT_TIMED_OUT);
+ }
+
+ /* so we are still waiting... */
+ return (EVENT_PENDING);
+
+}
+
+void
+TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg) {
+ std::stringstream err;
+ err << "DP event '" << event_name(m_event_type) << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - ";
+}
+
+/**
+ * event occured
+ *
+ */
+void
+TrexDpPortEvent::handle_event(int thread_id, int event_id) {
+
+ /* if the event is disabled - we don't care */
+ if (!is_active()) {
+ return;
+ }
+
+ /* check the event id is matching the required event */
+ if (event_id != m_event_id) {
+ err(thread_id, event_id, "event key mismatch");
+ }
+
+ /* mark sure no double signal */
+ if (m_signal.at(thread_id)) {
+ err(thread_id, event_id, "double signal");
+
+ } else {
+ /* mark */
+ m_signal.at(thread_id) = true;
+ m_pending_cnt--;
+ }
+
+ /* event occured */
+ if (m_pending_cnt == 0) {
+ m_port->on_dp_event_occured(m_event_type);
+ m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
+ }
+}
+
+bool
+TrexDpPortEvent::is_active() {
+ return (status() != EVENT_DISABLE);
+}
+
+bool
+TrexDpPortEvent::has_timeout_expired() {
+ return (status() == EVENT_TIMED_OUT);
+}
+
+const char *
+TrexDpPortEvent::event_name(event_e type) {
+ switch (type) {
+ case EVENT_STOP:
+ return "DP STOP";
+
+ default:
+ throw TrexException("unknown event type");
+ }
+
+}
diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h
new file mode 100644
index 00000000..309288df
--- /dev/null
+++ b/src/stateless/cp/trex_dp_port_events.h
@@ -0,0 +1,165 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_DP_PORT_EVENTS_H__
+#define __TREX_DP_PORT_EVENTS_H__
+
+#include <unordered_map>
+#include <string>
+
+class TrexStatelessPort;
+
+/**
+ * describes a single DP event related to port
+ *
+ * @author imarom (18-Nov-15)
+ */
+class TrexDpPortEvent {
+public:
+
+ enum event_e {
+ EVENT_STOP = 1,
+ EVENT_MAX
+ };
+
+ /**
+ * status of the event for the port
+ */
+ enum event_status_e {
+ EVENT_DISABLE,
+ EVENT_PENDING,
+ EVENT_TIMED_OUT,
+ EVENT_OCCURED
+ };
+
+ /**
+ * init for the event
+ *
+ */
+ void create(event_e type, TrexStatelessPort *port);
+
+ /**
+ * create a new pending event
+ *
+ */
+ void wait_for_event(int event_id, int timeout_ms = -1);
+
+ /**
+ * mark event as not allowed to happen
+ *
+ */
+ void disable();
+
+ /**
+ * get the event status
+ *
+ */
+ event_status_e status();
+
+ /**
+ * event occured
+ *
+ */
+ void handle_event(int thread_id, int event_id);
+
+ /**
+ * returns true if event is active
+ *
+ */
+ bool is_active();
+
+ /**
+ * has timeout already expired ?
+ *
+ */
+ bool has_timeout_expired();
+
+ /**
+ * generate error
+ *
+ */
+ void err(int thread_id, int event_id, const std::string &err_msg);
+
+ /**
+ * event to name
+ *
+ */
+ static const char * event_name(event_e type);
+
+
+private:
+
+ event_e m_event_type;
+ std::unordered_map<int, bool> m_signal;
+ int m_pending_cnt;
+
+ TrexStatelessPort *m_port;
+ int m_event_id;
+ int m_expire_limit_ms;
+
+};
+
+/**
+ * all the events related to a port
+ *
+ */
+class TrexDpPortEvents {
+public:
+ friend class TrexDpPortEvent;
+
+ void create(TrexStatelessPort *port);
+
+ /**
+ * generate a new event ID to be used with wait_for_event
+ *
+ */
+ int generate_event_id();
+
+ /**
+ * wait a new DP event on the port
+ * returns a key which will be used to identify
+ * the event happened
+ *
+ * @author imarom (18-Nov-15)
+ *
+ * @param ev - type of event
+ * @param event_id - a unique identifier for the event
+ * @param timeout_ms - does it has a timeout ?
+ *
+ */
+ void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1);
+
+ /**
+ * event has occured
+ *
+ */
+ void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id);
+
+private:
+ static const int EVENT_ID_INVALID = -1;
+
+ TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX];
+ int m_event_id_counter;
+
+ TrexStatelessPort *m_port;
+
+};
+
+#endif /* __TREX_DP_PORT_EVENTS_H__ */
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index e0e95450..6ef24a7b 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -47,7 +47,7 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) {
m_port_count = cfg.m_port_count;
for (int i = 0; i < m_port_count; i++) {
- m_ports.push_back(new TrexStatelessPort(i));
+ m_ports.push_back(new TrexStatelessPort(i, cfg.m_platform_api));
}
m_platform_api = cfg.m_platform_api;
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index cbc5a328..13d0fc9f 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -52,9 +52,25 @@ using namespace std;
* trex stateless port
*
**************************/
-TrexStatelessPort::TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) {
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) {
+ std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
+
+ m_port_id = port_id;
+
m_port_state = PORT_STATE_IDLE;
clear_owner();
+
+ /* get the DP cores belonging to this port */
+ api->port_id_to_cores(m_port_id, core_pair_list);
+
+ for (auto core_pair : core_pair_list) {
+
+ /* send the core id */
+ m_cores_id_list.push_back(core_pair.first);
+ }
+
+ /* init the events DP DB */
+ m_dp_events.create(this);
}
@@ -105,11 +121,16 @@ TrexStatelessPort::start_traffic(double mul, double duration) {
}
/* generate a message to all the relevant DP cores to start transmitting */
- TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(compiled_obj, duration);
+ m_event_id = m_dp_events.generate_event_id();
+ TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_event_id, compiled_obj, duration);
+
+ change_state(PORT_STATE_TX);
send_message_to_dp(start_msg);
- change_state(PORT_STATE_TX);
+ /* mark that DP event of stoppped is possible */
+ m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, m_event_id);
+
}
/**
@@ -279,15 +300,32 @@ TrexStatelessPort::encode_stats(Json::Value &port) {
void
TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) {
- std::vector<std::pair<uint8_t, uint8_t>> cores_id_list;
-
- get_stateless_obj()->get_platform_api()->port_id_to_cores(m_port_id, cores_id_list);
-
- for (auto core_pair : cores_id_list) {
+ for (auto core_id : m_cores_id_list) {
/* send the message to the core */
- CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_pair.first);
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
ring->Enqueue((CGenNode *)msg->clone());
}
}
+
+/**
+ * when a DP (async) event occurs - handle it
+ *
+ */
+void
+TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
+ switch (event_type) {
+
+ case TrexDpPortEvent::EVENT_STOP:
+ /* set a stop event */
+ change_state(PORT_STATE_STREAMS);
+ /* send a ZMQ event */
+ break;
+
+ default:
+ assert(0);
+
+ }
+ printf("hey");
+}
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index b533f793..da75284e 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -22,7 +22,9 @@ limitations under the License.
#define __TREX_STATELESS_PORT_H__
#include <trex_stream.h>
+#include <trex_dp_port_events.h>
+class TrexPlatformApi;
class TrexStatelessCpToDpMsgBase;
/**
@@ -31,6 +33,8 @@ class TrexStatelessCpToDpMsgBase;
* @author imarom (31-Aug-15)
*/
class TrexStatelessPort {
+ friend class TrexDpPortEvent;
+
public:
/**
@@ -54,7 +58,7 @@ public:
RC_ERR_FAILED_TO_COMPILE_STREAMS
};
- TrexStatelessPort(uint8_t port_id);
+ TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api);
/**
* acquire port
@@ -199,6 +203,10 @@ public:
m_stream_table.get_object_list(object_list);
}
+ TrexDpPortEvents & get_dp_events() {
+ return m_dp_events;
+ }
+
private:
@@ -224,6 +232,10 @@ private:
}
+ const std::vector<int> get_core_id_list () {
+ return m_cores_id_list;
+ }
+
bool verify_state(int state, bool should_throw = true) const;
void change_state(port_state_e new_state);
@@ -232,11 +244,25 @@ private:
void send_message_to_dp(TrexStatelessCpToDpMsgBase *msg);
- TrexStreamTable m_stream_table;
- uint8_t m_port_id;
- port_state_e m_port_state;
- std::string m_owner;
- std::string m_owner_handler;
+ /**
+ * triggered when event occurs
+ *
+ */
+ void on_dp_event_occured(TrexDpPortEvent::event_e event_type);
+
+
+ TrexStreamTable m_stream_table;
+ uint8_t m_port_id;
+ port_state_e m_port_state;
+ std::string m_owner;
+ std::string m_owner_handler;
+
+ /* holds the DP cores associated with this port */
+ //std::vector<std::pair<uint8_t, uint8_t>> m_cores_id_list;
+ std::vector<int> m_cores_id_list;
+
+ TrexDpPortEvents m_dp_events;
+ int m_event_id;
};
#endif /* __TREX_STATELESS_PORT_H__ */
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index c4fdd44b..cde34a4b 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -486,6 +486,7 @@ TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
if ( duration > 0.0 ){
add_port_duration( duration ,obj->get_port_id() );
}
+
}
@@ -516,6 +517,17 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
schedule_exit();
}
+
+ /* inform the control plane we stopped - this might be a async stop
+ (streams ended)
+ */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
+ TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
+ port_id,
+ TrexDpPortEvent::EVENT_STOP,
+ get_event_id());
+ ring->Enqueue((CGenNode *)event_msg);
+
}
/**
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 85afcf8f..7ee5abc4 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -166,6 +166,14 @@ public:
/* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */
void quit_main_loop();
+ void set_event_id(int event_id) {
+ m_event_id = event_id;
+ }
+
+ int get_event_id() {
+ return m_event_id;
+ }
+
bool set_stateless_next_node(CGenNodeStateless * cur_node,
CGenNodeStateless * next_node);
@@ -228,6 +236,7 @@ private:
CFlowGenListPerThread * m_core;
double m_duration;
+ int m_event_id;
};
#endif /* __TREX_STATELESS_DP_CORE_H__ */
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 856fd9e3..c92ad68a 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -22,12 +22,17 @@ limitations under the License.
#include <trex_stateless_messaging.h>
#include <trex_stateless_dp_core.h>
#include <trex_streams_compiler.h>
+#include <trex_stateless.h>
+
#include <string.h>
/*************************
start traffic message
************************/
-TrexStatelessDpStart::TrexStatelessDpStart(TrexStreamsCompiledObj *obj, double duration) : m_obj(obj), m_duration(duration) {
+TrexStatelessDpStart::TrexStatelessDpStart(int event_id, TrexStreamsCompiledObj *obj, double duration) {
+ m_event_id = event_id;
+ m_obj = obj;
+ m_duration = duration;
}
@@ -40,7 +45,7 @@ TrexStatelessDpStart::clone() {
TrexStreamsCompiledObj *new_obj = m_obj->clone();
- TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStart(new_obj, m_duration);
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStart(m_event_id, new_obj, m_duration);
return new_msg;
}
@@ -54,7 +59,12 @@ TrexStatelessDpStart::~TrexStatelessDpStart() {
bool
TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
+ /* mark the event id for DP response */
+ dp_core->set_event_id(m_event_id);
+
+ /* staet traffic */
dp_core->start_traffic(m_obj, m_duration);
+
return true;
}
@@ -96,7 +106,6 @@ bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){
return (true);
}
-
bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){
if ( dp_core->are_all_ports_idle() ){
@@ -115,3 +124,12 @@ TrexStatelessDpCanQuit::clone(){
}
+/************************* messages from DP to CP **********************/
+bool
+TrexDpPortEventMsg::handle() {
+ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id);
+ port->get_dp_events().handle_event(m_event_type, m_thread_id, m_event_id);
+
+ return (true);
+}
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 7dc307c7..445e9378 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -23,6 +23,7 @@ limitations under the License.
#define __TREX_STATELESS_MESSAGING_H__
#include <msg_manager.h>
+#include <trex_dp_port_events.h>
class TrexStatelessDpCore;
class TrexStreamsCompiledObj;
@@ -42,12 +43,8 @@ public:
virtual ~TrexStatelessCpToDpMsgBase() {
}
- /**
- * virtual function to handle a message
- *
- */
- virtual bool handle(TrexStatelessDpCore *dp_core) = 0;
+ virtual bool handle(TrexStatelessDpCore *dp_core) = 0;
/**
* clone the current message
@@ -66,7 +63,9 @@ public:
/* no copy constructor */
TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete;
-private:
+
+protected:
+ int m_event_id;
bool m_quit_scheduler;
};
@@ -78,16 +77,17 @@ private:
class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase {
public:
- TrexStatelessDpStart(TrexStreamsCompiledObj *obj, double duration);
+ TrexStatelessDpStart(int m_event_id, TrexStreamsCompiledObj *obj, double duration);
~TrexStatelessDpStart();
- virtual bool handle(TrexStatelessDpCore *dp_core);
-
virtual TrexStatelessCpToDpMsgBase * clone();
+ virtual bool handle(TrexStatelessDpCore *dp_core);
private:
+
+ int m_event_id;
TrexStreamsCompiledObj *m_obj;
double m_duration;
};
@@ -103,10 +103,10 @@ public:
TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) {
}
- virtual bool handle(TrexStatelessDpCore *dp_core);
-
virtual TrexStatelessCpToDpMsgBase * clone();
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
private:
uint8_t m_port_id;
};
@@ -122,9 +122,11 @@ public:
TrexStatelessDpQuit() {
}
- virtual bool handle(TrexStatelessDpCore *dp_core);
virtual TrexStatelessCpToDpMsgBase * clone();
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
};
/**
@@ -145,4 +147,58 @@ public:
+/************************* messages from DP to CP **********************/
+
+/**
+ * defines the base class for CP to DP messages
+ *
+ * @author imarom (27-Oct-15)
+ */
+class TrexStatelessDpToCpMsgBase {
+public:
+
+ TrexStatelessDpToCpMsgBase() {
+ }
+
+ virtual ~TrexStatelessDpToCpMsgBase() {
+ }
+
+ /**
+ * virtual function to handle a message
+ *
+ */
+ virtual bool handle() = 0;
+
+ /* no copy constructor */
+ TrexStatelessDpToCpMsgBase(TrexStatelessDpToCpMsgBase &) = delete;
+
+};
+
+
+/**
+ * a message indicating an event has happened on a port at the
+ * DP
+ *
+ */
+class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
+public:
+
+ TrexDpPortEventMsg(int thread_id, uint8_t port_id, TrexDpPortEvent::event_e type, int event_id) {
+ m_thread_id = thread_id;
+ m_port_id = port_id;
+ m_event_type = type;
+ m_event_id = event_id;
+ }
+
+ virtual bool handle();
+
+private:
+ int m_thread_id;
+ uint8_t m_port_id;
+ TrexDpPortEvent::event_e m_event_type;
+ int m_event_id;
+
+};
+
#endif /* __TREX_STATELESS_MESSAGING_H__ */
+