diff options
Diffstat (limited to 'src/main_dpdk.cpp')
-rwxr-xr-x | src/main_dpdk.cpp | 148 |
1 files changed, 67 insertions, 81 deletions
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 865c84ed..6c92172c 100755 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -58,6 +58,7 @@ limitations under the License. #include <stateless/cp/trex_stateless.h> #include <stateless/dp/trex_stream_node.h> +#include <publisher/trex_publisher.h> #include <stateless/messaging/trex_stateless_messaging.h> #include <../linux_dpdk/version.h> @@ -2398,71 +2399,6 @@ private: }; -class CZMqPublisher { -public: - CZMqPublisher(){ - m_context=0; - m_publisher=0; - } - - bool Create(uint16_t port,bool disable); - void Delete(); - void publish_json(std::string & s); -private: - void show_zmq_last_error(char *s); -private: - void * m_context; - void * m_publisher; -}; - -void CZMqPublisher::show_zmq_last_error(char *s){ - printf(" ERROR %s \n",s); - printf(" ZMQ: %s",zmq_strerror (zmq_errno ())); - exit(-1); -} - - -bool CZMqPublisher::Create(uint16_t port,bool disable){ - - if (disable) { - return(true); - } - m_context = zmq_ctx_new (); - if ( m_context == 0 ) { - show_zmq_last_error((char *)"can't connect to ZMQ library"); - } - m_publisher = zmq_socket (m_context, ZMQ_PUB); - if ( m_context == 0 ) { - show_zmq_last_error((char *)"can't create ZMQ socket"); - } - char buffer[100]; - sprintf(buffer,"tcp://*:%d",port); - int rc=zmq_bind (m_publisher, buffer); - if (rc != 0 ) { - sprintf(buffer,"can't bind to ZMQ socket %d",port); - show_zmq_last_error(buffer); - } - printf("zmq publisher at: %s \n",buffer); - return (true); -} - - -void CZMqPublisher::Delete(){ - if (m_publisher) { - zmq_close (m_publisher); - } - if (m_context) { - zmq_ctx_destroy (m_context); - } -} - - -void CZMqPublisher::publish_json(std::string & s){ - if ( m_publisher ){ - int size = zmq_send (m_publisher, s.c_str(), s.length(), 0); - assert(size==s.length()); - } -} class CPerPortStats { public: @@ -2825,6 +2761,10 @@ private: void try_stop_all_dp(); /* send message to all dp cores */ int send_message_all_dp(TrexStatelessCpToDpMsgBase *msg); + + void check_for_dp_message_from_core(int thread_id); + void check_for_dp_messages(); + public: int start_send_master(); @@ -2965,7 +2905,7 @@ private: CLatencyVmPort m_latency_vm_vports[BP_MAX_PORTS]; /* vm driver */ CLatencyPktInfo m_latency_pkt; - CZMqPublisher m_zmq_publisher; + TrexPublisher m_zmq_publisher; public: TrexStateless *m_trex_stateless; @@ -3233,6 +3173,46 @@ int CGlobalTRex::reset_counters(){ return (0); } +/** + * check for a single core + * + * @author imarom (19-Nov-15) + * + * @param thread_id + */ +void +CGlobalTRex::check_for_dp_message_from_core(int thread_id) { + + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(thread_id); + + /* fast path check */ + if ( likely ( ring->isEmpty() ) ) { + return; + } + + while ( true ) { + CGenNode * node = NULL; + if (ring->Dequeue(node) != 0) { + break; + } + assert(node); + + TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node; + msg->handle(); + } + +} + + +void +CGlobalTRex::check_for_dp_messages() { + /* for all the cores - check for a new message */ + for (int i = 0; i < get_cores_tx(); i++) { + check_for_dp_message_from_core(i); + } + + +} bool CGlobalTRex::is_all_links_are_up(bool dump){ bool all_link_are=true; @@ -3495,21 +3475,7 @@ int CGlobalTRex::ixgbe_start(void){ bool CGlobalTRex::Create(){ CFlowsYamlInfo pre_yaml_info; - if (get_is_stateless()) { - - TrexStatelessCfg cfg; - - TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port); - - cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd; - cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; - cfg.m_rpc_async_cfg = NULL; - cfg.m_rpc_server_verbose = false; - cfg.m_platform_api = new TrexDpdkPlatformApi(); - - m_trex_stateless = new TrexStateless(cfg); - - } else { + if (!get_is_stateless()) { pre_yaml_info.load_from_yaml_file(CGlobalInfo::m_options.cfg_file); } @@ -3553,6 +3519,23 @@ bool CGlobalTRex::Create(){ CGlobalInfo::init_pools(rx_mbuf); ixgbe_start(); dump_config(stdout); + + /* start stateless */ + if (get_is_stateless()) { + + TrexStatelessCfg cfg; + + TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port); + + cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd; + cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; + cfg.m_rpc_async_cfg = NULL; + cfg.m_rpc_server_verbose = false; + cfg.m_platform_api = new TrexDpdkPlatformApi(); + + m_trex_stateless = new TrexStateless(cfg); + } + return (true); } @@ -4119,6 +4102,9 @@ int CGlobalTRex::run_in_master(){ m_trex_stateless->generate_publish_snapshot(json); m_zmq_publisher.publish_json(json); + /* check from messages from DP */ + check_for_dp_messages(); + delay(500); if ( is_all_cores_finished() ) { |