summaryrefslogtreecommitdiffstats
path: root/src/main_dpdk.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/main_dpdk.cpp')
-rwxr-xr-xsrc/main_dpdk.cpp151
1 files changed, 70 insertions, 81 deletions
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index f66bcd9e..b1c9ed12 100755
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -58,6 +58,7 @@ limitations under the License.
#include <stateless/cp/trex_stateless.h>
#include <stateless/dp/trex_stream_node.h>
+#include <publisher/trex_publisher.h>
#include <stateless/messaging/trex_stateless_messaging.h>
#include <../linux_dpdk/version.h>
@@ -2398,71 +2399,6 @@ private:
};
-class CZMqPublisher {
-public:
- CZMqPublisher(){
- m_context=0;
- m_publisher=0;
- }
-
- bool Create(uint16_t port,bool disable);
- void Delete();
- void publish_json(std::string & s);
-private:
- void show_zmq_last_error(char *s);
-private:
- void * m_context;
- void * m_publisher;
-};
-
-void CZMqPublisher::show_zmq_last_error(char *s){
- printf(" ERROR %s \n",s);
- printf(" ZMQ: %s",zmq_strerror (zmq_errno ()));
- exit(-1);
-}
-
-
-bool CZMqPublisher::Create(uint16_t port,bool disable){
-
- if (disable) {
- return(true);
- }
- m_context = zmq_ctx_new ();
- if ( m_context == 0 ) {
- show_zmq_last_error((char *)"can't connect to ZMQ library");
- }
- m_publisher = zmq_socket (m_context, ZMQ_PUB);
- if ( m_context == 0 ) {
- show_zmq_last_error((char *)"can't create ZMQ socket");
- }
- char buffer[100];
- sprintf(buffer,"tcp://*:%d",port);
- int rc=zmq_bind (m_publisher, buffer);
- if (rc != 0 ) {
- sprintf(buffer,"can't bind to ZMQ socket %d",port);
- show_zmq_last_error(buffer);
- }
- printf("zmq publisher at: %s \n",buffer);
- return (true);
-}
-
-
-void CZMqPublisher::Delete(){
- if (m_publisher) {
- zmq_close (m_publisher);
- }
- if (m_context) {
- zmq_ctx_destroy (m_context);
- }
-}
-
-
-void CZMqPublisher::publish_json(std::string & s){
- if ( m_publisher ){
- int size = zmq_send (m_publisher, s.c_str(), s.length(), 0);
- assert(size==s.length());
- }
-}
class CPerPortStats {
public:
@@ -2825,6 +2761,10 @@ private:
void try_stop_all_dp();
/* send message to all dp cores */
int send_message_all_dp(TrexStatelessCpToDpMsgBase *msg);
+
+ void check_for_dp_message_from_core(int thread_id);
+ void check_for_dp_messages();
+
public:
int start_send_master();
@@ -2965,7 +2905,7 @@ private:
CLatencyVmPort m_latency_vm_vports[BP_MAX_PORTS]; /* vm driver */
CLatencyPktInfo m_latency_pkt;
- CZMqPublisher m_zmq_publisher;
+ TrexPublisher m_zmq_publisher;
public:
TrexStateless *m_trex_stateless;
@@ -3233,6 +3173,48 @@ int CGlobalTRex::reset_counters(){
return (0);
}
+/**
+ * check for a single core
+ *
+ * @author imarom (19-Nov-15)
+ *
+ * @param thread_id
+ */
+void
+CGlobalTRex::check_for_dp_message_from_core(int thread_id) {
+
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(thread_id);
+
+ /* fast path check */
+ if ( likely ( ring->isEmpty() ) ) {
+ return;
+ }
+
+ while ( true ) {
+ CGenNode * node = NULL;
+ if (ring->Dequeue(node) != 0) {
+ break;
+ }
+ assert(node);
+
+ TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node;
+ msg->handle();
+ delete msg;
+ }
+
+}
+
+/**
+ * check for messages that arrived from DP to CP
+ *
+ */
+void
+CGlobalTRex::check_for_dp_messages() {
+ /* for all the cores - check for a new message */
+ for (int i = 0; i < get_cores_tx(); i++) {
+ check_for_dp_message_from_core(i);
+ }
+}
bool CGlobalTRex::is_all_links_are_up(bool dump){
bool all_link_are=true;
@@ -3495,21 +3477,7 @@ int CGlobalTRex::ixgbe_start(void){
bool CGlobalTRex::Create(){
CFlowsYamlInfo pre_yaml_info;
- if (get_is_stateless()) {
-
- TrexStatelessCfg cfg;
-
- TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port);
-
- cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd;
- cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg;
- cfg.m_rpc_async_cfg = NULL;
- cfg.m_rpc_server_verbose = false;
- cfg.m_platform_api = new TrexDpdkPlatformApi();
-
- m_trex_stateless = new TrexStateless(cfg);
-
- } else {
+ if (!get_is_stateless()) {
pre_yaml_info.load_from_yaml_file(CGlobalInfo::m_options.cfg_file);
}
@@ -3553,6 +3521,24 @@ bool CGlobalTRex::Create(){
CGlobalInfo::init_pools(rx_mbuf);
ixgbe_start();
dump_config(stdout);
+
+ /* start stateless */
+ if (get_is_stateless()) {
+
+ TrexStatelessCfg cfg;
+
+ TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port);
+
+ cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd;
+ cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg;
+ cfg.m_rpc_async_cfg = NULL;
+ cfg.m_rpc_server_verbose = false;
+ cfg.m_platform_api = new TrexDpdkPlatformApi();
+ cfg.m_publisher = &m_zmq_publisher;
+
+ m_trex_stateless = new TrexStateless(cfg);
+ }
+
return (true);
}
@@ -4119,6 +4105,9 @@ int CGlobalTRex::run_in_master(){
m_trex_stateless->generate_publish_snapshot(json);
m_zmq_publisher.publish_json(json);
+ /* check from messages from DP */
+ check_for_dp_messages();
+
delay(500);
if ( is_all_cores_finished() ) {