summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xlinux/ws_main.py2
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py48
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py9
-rw-r--r--scripts/exp/ignore-0-ex.erfbin0 -> 880 bytes
-rw-r--r--scripts/exp/ignore-0.erfbin0 -> 880 bytes
-rw-r--r--src/gtest/trex_stateless_gtest.cpp121
-rwxr-xr-xsrc/main_dpdk.cpp9
-rw-r--r--src/publisher/trex_publisher.cpp2
-rw-r--r--src/publisher/trex_publisher.h2
-rw-r--r--src/stateless/cp/trex_stateless.cpp2
-rw-r--r--src/stateless/cp/trex_stateless.h9
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp6
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h16
13 files changed, 201 insertions, 25 deletions
diff --git a/linux/ws_main.py b/linux/ws_main.py
index 0bd61f70..617bcf37 100755
--- a/linux/ws_main.py
+++ b/linux/ws_main.py
@@ -117,7 +117,7 @@ main_src = SrcGroup(dir='src',
'utl_json.cpp',
'utl_cpuu.cpp',
'msg_manager.cpp',
-
+ 'publisher/trex_publisher.cpp',
'pal/linux/pal_utl.cpp',
'pal/linux/mbuf.cpp'
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 31bec93f..adb91d97 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -8,6 +8,8 @@ except ImportError:
import client.outer_packages
from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
+from common.text_opts import *
+
import json
import threading
import time
@@ -103,13 +105,9 @@ class TrexAsyncStatsManager():
return self.port_stats[str(port_id)]
- def update (self, snapshot):
-
- if snapshot['name'] == 'trex-global':
- self.__handle_snapshot(snapshot['data'])
- else:
- # for now ignore the rest
- return
+
+ def update (self, data):
+ self.__handle_snapshot(data)
def __handle_snapshot (self, snapshot):
@@ -151,10 +149,11 @@ class TrexAsyncStatsManager():
class CTRexAsyncClient():
- def __init__ (self, server, port):
+ def __init__ (self, server, port, stateless_client):
self.port = port
self.server = server
+ self.stateless_client = stateless_client
self.raw_snapshot = {}
@@ -165,14 +164,15 @@ class CTRexAsyncClient():
print "\nConnecting To ZMQ Publisher At {0}".format(self.tr)
self.active = True
- self.t = threading.Thread(target = self._run)
+ self.t = threading.Thread(target = self.run)
# kill this thread on exit and don't add it to the join list
self.t.setDaemon(True)
self.t.start()
- def _run (self):
+
+ def run (self):
# Socket to talk to server
self.context = zmq.Context()
@@ -185,10 +185,12 @@ class CTRexAsyncClient():
line = self.socket.recv_string();
msg = json.loads(line)
- key = msg['name']
- self.raw_snapshot[key] = msg['data']
+ name = msg['name']
+ data = msg['data']
+ type = msg['type']
+ self.raw_snapshot[name] = data
- self.stats.update(msg)
+ self.__dispatch(name, type, data)
def get_stats (self):
@@ -199,6 +201,26 @@ class CTRexAsyncClient():
return self.raw_snapshot
+ # dispatch the message to the right place
+ def __dispatch (self, name, type, data):
+ # stats
+ if name == "trex-global":
+ self.stats.update(data)
+ # events
+ elif name == "trex-event":
+ self.__handle_async_event(type, data)
+ else:
+ # ignore
+ pass
+
+ def __handle_async_event (self, type, data):
+ # DP stopped
+ if (type == 0):
+ port_id = int(data['port_id'])
+ print format_text("\n[Event] - Port {0} Stopped".format(port_id), 'bold')
+ # call the handler
+ self.stateless_client.async_event_port_stopped(port_id)
+
def stop (self):
self.active = False
self.t.join()
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 699f0af2..dd11fb67 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -320,6 +320,9 @@ class Port(object):
return self.ok()
+ ################# events handler ######################
+ def async_event_port_stopped (self):
+ self.state = self.STATE_STREAMS
class CTRexStatelessClient(object):
@@ -337,12 +340,16 @@ class CTRexStatelessClient(object):
self._server_version = None
self.__err_log = None
- self._async_client = CTRexAsyncClient(server, async_port)
+ self._async_client = CTRexAsyncClient(server, async_port, self)
self.streams_db = CStreamsDB()
self.connected = False
+ ################# events handler ######################
+ def async_event_port_stopped (self, port_id):
+ self.ports[port_id].async_event_port_stopped()
+
############# helper functions section ##############
def validate_port_list(self, port_id_list):
diff --git a/scripts/exp/ignore-0-ex.erf b/scripts/exp/ignore-0-ex.erf
new file mode 100644
index 00000000..92883717
--- /dev/null
+++ b/scripts/exp/ignore-0-ex.erf
Binary files differ
diff --git a/scripts/exp/ignore-0.erf b/scripts/exp/ignore-0.erf
new file mode 100644
index 00000000..92883717
--- /dev/null
+++ b/scripts/exp/ignore-0.erf
Binary files differ
diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp
index 432c7382..d7bf2ab6 100644
--- a/src/gtest/trex_stateless_gtest.cpp
+++ b/src/gtest/trex_stateless_gtest.cpp
@@ -46,16 +46,51 @@ public:
};
+/**
+ * handler for DP to CP messages
+ *
+ * @author imarom (19-Nov-15)
+ */
+class DpToCpHandler {
+public:
+ virtual void handle(TrexStatelessDpToCpMsgBase *msg) = 0;
+};
class CBasicStl {
public:
+
+
CBasicStl(){
m_time_diff=0.001;
m_threads=1;
m_dump_json=false;
+ m_dp_to_cp_handler = NULL;
+ }
+
+
+ void flush_dp_to_cp_messages() {
+
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(0);
+
+ while ( true ) {
+ CGenNode * node = NULL;
+ if (ring->Dequeue(node) != 0) {
+ break;
+ }
+ assert(node);
+
+ TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node;
+ if (m_dp_to_cp_handler) {
+ m_dp_to_cp_handler->handle(msg);
+ }
+
+ delete msg;
+ }
+
}
+
bool init(void){
CErfIFStl erf_vif;
@@ -106,14 +141,18 @@ public:
printf(" %s \n",s.c_str());
}
+ flush_dp_to_cp_messages();
+
fl.Delete();
return (res);
}
public:
- int m_threads;
- double m_time_diff;
- bool m_dump_json;
+ int m_threads;
+ double m_time_diff;
+ bool m_dump_json;
+ DpToCpHandler *m_dp_to_cp_handler;
+
TrexStatelessCpToDpMsgBase * m_msg;
CNodeRing *m_ring_from_cp;
CFlowGenList fl;
@@ -292,7 +331,7 @@ TEST_F(basic_stl, simple_prog4) {
EXPECT_TRUE(compile.compile(streams, comp_obj) );
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 20.0 );
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 20.0 );
t1.m_msg = lpstart;
@@ -1003,4 +1042,78 @@ TEST_F(basic_stl, compile_good_stream_id_compres) {
+class DpToCpHandlerStopEvent: public DpToCpHandler {
+public:
+ DpToCpHandlerStopEvent(int event_id) {
+ m_event_id = event_id;
+ }
+
+ virtual void handle(TrexStatelessDpToCpMsgBase *msg) {
+ /* first the message must be an event */
+ TrexDpPortEventMsg *event = dynamic_cast<TrexDpPortEventMsg *>(msg);
+ EXPECT_TRUE(event != NULL);
+ EXPECT_TRUE(event->get_event_type() == TrexDpPortEvent::EVENT_STOP);
+
+ EXPECT_TRUE(event->get_event_id() == m_event_id);
+ EXPECT_TRUE(event->get_port_id() == 0);
+
+ }
+
+private:
+ int m_event_id;
+};
+
+TEST_F(basic_stl, dp_stop_event) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/ignore";
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST,0,0);
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(100);
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(17, comp_obj.clone(), 10.0 /*sec */ );
+
+
+ t1.m_msg = lpstart;
+
+ /* let me handle these */
+ DpToCpHandlerStopEvent handler(17);
+ t1.m_dp_to_cp_handler = &handler;
+
+ bool res=t1.init();
+ EXPECT_EQ_UINT32(1, res?1:0);
+
+ delete stream1 ;
+
+}
+
+
/********************************************* Itay Tests End *************************************/
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 6c92172c..80739d35 100755
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -3199,19 +3199,21 @@ CGlobalTRex::check_for_dp_message_from_core(int thread_id) {
TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node;
msg->handle();
+ delete msg;
}
}
-
+/**
+ * check for messages that arrived from DP to CP
+ *
+ */
void
CGlobalTRex::check_for_dp_messages() {
/* for all the cores - check for a new message */
for (int i = 0; i < get_cores_tx(); i++) {
check_for_dp_message_from_core(i);
}
-
-
}
bool CGlobalTRex::is_all_links_are_up(bool dump){
@@ -3532,6 +3534,7 @@ bool CGlobalTRex::Create(){
cfg.m_rpc_async_cfg = NULL;
cfg.m_rpc_server_verbose = false;
cfg.m_platform_api = new TrexDpdkPlatformApi();
+ cfg.m_publisher = &m_zmq_publisher;
m_trex_stateless = new TrexStateless(cfg);
}
diff --git a/src/publisher/trex_publisher.cpp b/src/publisher/trex_publisher.cpp
index 1afb558a..35653069 100644
--- a/src/publisher/trex_publisher.cpp
+++ b/src/publisher/trex_publisher.cpp
@@ -86,7 +86,7 @@ TrexPublisher::publish_event(event_type_e type, const Json::Value &data) {
Json::Value value;
std::string s;
- value["name"] = "event";
+ value["name"] = "trex-event";
value["type"] = type;
value["data"] = data;
diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h
index 07d06678..82603fda 100644
--- a/src/publisher/trex_publisher.h
+++ b/src/publisher/trex_publisher.h
@@ -39,7 +39,7 @@ public:
void publish_json(const std::string &s);
enum event_type_e {
- EVENT_PORT_STOPPED = 1
+ EVENT_PORT_STOPPED = 0
};
void publish_event(event_type_e type, const Json::Value &data);
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index 6ef24a7b..a4522837 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -51,6 +51,8 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) {
}
m_platform_api = cfg.m_platform_api;
+ m_publisher = cfg.m_publisher;
+
}
/**
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index 57c6ef1d..5c11be1e 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -30,6 +30,7 @@ limitations under the License.
#include <trex_stream.h>
#include <trex_stateless_port.h>
#include <trex_rpc_server_api.h>
+#include <publisher/trex_publisher.h>
#include <internal_api/trex_platform_api.h>
@@ -93,6 +94,7 @@ public:
m_rpc_async_cfg = NULL;
m_rpc_server_verbose = false;
m_platform_api = NULL;
+ m_publisher = NULL;
}
const TrexRpcServerConfig *m_rpc_req_resp_cfg;
@@ -100,6 +102,7 @@ public:
const TrexPlatformApi *m_platform_api;
bool m_rpc_server_verbose;
uint8_t m_port_count;
+ TrexPublisher *m_publisher;
};
/**
@@ -150,6 +153,10 @@ public:
return (m_platform_api);
}
+ TrexPublisher * get_publisher() {
+ return m_publisher;
+ }
+
const std::vector <TrexStatelessPort *> get_port_list() {
return m_ports;
}
@@ -170,6 +177,8 @@ protected:
/* platform API */
const TrexPlatformApi *m_platform_api;
+ TrexPublisher *m_publisher;
+
std::mutex m_global_cp_lock;
};
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 13d0fc9f..489e2172 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -315,17 +315,21 @@ TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) {
*/
void
TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
+ Json::Value data;
+
switch (event_type) {
case TrexDpPortEvent::EVENT_STOP:
/* set a stop event */
change_state(PORT_STATE_STREAMS);
/* send a ZMQ event */
+
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
break;
default:
assert(0);
}
- printf("hey");
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 445e9378..ffb36124 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -192,6 +192,22 @@ public:
virtual bool handle();
+ int get_thread_id() {
+ return m_thread_id;
+ }
+
+ uint8_t get_port_id() {
+ return m_port_id;
+ }
+
+ TrexDpPortEvent::event_e get_event_type() {
+ return m_event_type;
+ }
+
+ int get_event_id() {
+ return m_event_id;
+ }
+
private:
int m_thread_id;
uint8_t m_port_id;