From 9932ff8dcf4f8b6b6f3986832f8a1a8f8461c743 Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 18 Jan 2016 06:59:36 -0500 Subject: async publish now --- .../trex_control_plane/client/trex_async_client.py | 53 ++++++++-- .../client/trex_stateless_client.py | 11 +- .../client_utils/jsonrpc_client.py | 7 +- .../trex_control_plane/common/trex_types.py | 18 +++- .../trex_control_plane/console/trex_console.py | 49 +++------ scripts/stl_test_api/__init__.py | 38 +++++-- scripts/stl_test_example.py | 54 ++++++---- src/internal_api/trex_platform_api.h | 3 + src/main_dpdk.cpp | 113 +++++++++++++-------- src/mock/trex_platform_api_mock.cpp | 1 + src/publisher/trex_publisher.cpp | 15 +++ src/publisher/trex_publisher.h | 12 +++ src/rpc-server/commands/trex_rpc_cmd_general.cpp | 16 +++ src/rpc-server/commands/trex_rpc_cmds.h | 1 + src/rpc-server/trex_rpc_async_server.cpp | 3 + src/rpc-server/trex_rpc_cmds_table.cpp | 1 + src/sim/trex_sim_stateless.cpp | 4 + 17 files changed, 280 insertions(+), 119 deletions(-) diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 66e65a32..2bb0e9cd 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -16,6 +16,7 @@ import time import datetime import zmq import re +import random from common.trex_stats import * from common.trex_streams import * @@ -155,6 +156,7 @@ class CTRexAsyncClient(): self.stats = CTRexAsyncStatsManager() self.last_data_recv_ts = 0 + self.async_barriers = [] self.connected = False @@ -188,17 +190,16 @@ class CTRexAsyncClient(): self.connected = True - - # wait for data streaming from the server - timeout = time.time() + 5 - while not self.is_alive(): - time.sleep(0.01) - if time.time() > timeout: - self.disconnect() - return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr) + # send a barrier and wait for ack + rc = self.block_on_stats() + if not rc: + self.disconnect() + return rc return RC_OK() + + # disconnect def disconnect (self): @@ -284,10 +285,46 @@ class CTRexAsyncClient(): # stats if name == "trex-global": self.stateless_client.handle_async_stats_update(data) + # events elif name == "trex-event": self.stateless_client.handle_async_event(type, data) + + # barriers + elif name == "trex-barrier": + self.handle_async_barrier(type, data) else: pass + # async barrier handling routine + def handle_async_barrier (self, type, data): + + for b in self.async_barriers: + if b['key'] == type: + b['ack'] = True + + + # force update a new snapshot from the server + def block_on_stats(self, timeout = 5): + + # set a random key + key = random.getrandbits(32) + barrier = {'key': key, 'ack' : False} + + # add to the queue + self.async_barriers.append(barrier) + + rc = self.stateless_client.transmit("publish_now", params = {'key' : key}) + if not rc: + return rc + + expr = time.time() + timeout + while not barrier['ack']: + time.sleep(0.001) + if time.time() > expr: + return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr) + + return RC_OK() + + diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index b7700c7c..105c4d01 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -55,12 +55,16 @@ class LoggerApi(object): return (self.level >= level) + # simple log message with verbose def log (self, msg, level = VERBOSE_REGULAR, newline = True): if not self.check_verbose(level): return self.write(msg, newline) + # annotates an action with a RC - writes to log the result + def annotate (self, rc, desc = None, show_status = True): + rc.annotate(self.log, desc, show_status) # default logger - to stdout class DefaultLogger(LoggerApi): @@ -82,7 +86,7 @@ class CTRexStatelessClient(object): server = "localhost", sync_port = 4501, async_port = 4500, - quiet = False, + verbose_level = LoggerApi.VERBOSE_REGULAR, virtual = False, logger = None): @@ -90,13 +94,14 @@ class CTRexStatelessClient(object): self.user = username + # logger if not logger: self.logger = DefaultLogger() else: self.logger = logger - if quiet: - self.logger.set_verbose(self.logger.VERBOSE_QUIET) + # initial verbose + self.logger.set_verbose(verbose_level) self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.logger) diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index a5e01340..e08f5d69 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -124,7 +124,7 @@ class JsonRpcClient(object): break except zmq.Again: tries += 1 - if tries > 10: + if tries > 5: self.disconnect() return RC_ERR("*** [RPC] - Failed to send message to server") @@ -136,9 +136,9 @@ class JsonRpcClient(object): break except zmq.Again: tries += 1 - if tries > 10: + if tries > 5: self.disconnect() - return RC_ERR("*** [RPC] - Failed to get server response") + return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport)) self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n") @@ -173,6 +173,7 @@ class JsonRpcClient(object): else: return RC_ERR(response_json["error"]["message"]) + # if no error there should be a result if ("result" not in response_json): return RC_ERR("Malformed Response ({0})".format(str(response_json))) diff --git a/scripts/automation/trex_control_plane/common/trex_types.py b/scripts/automation/trex_control_plane/common/trex_types.py index 21deffd2..5c29f59b 100644 --- a/scripts/automation/trex_control_plane/common/trex_types.py +++ b/scripts/automation/trex_control_plane/common/trex_types.py @@ -17,7 +17,7 @@ class RC(): def __init__ (self, rc = None, data = None): self.rc_list = [] - if (rc != None) and (data != None): + if (rc != None): tuple_rc = namedtuple('RC', ['rc', 'data']) self.rc_list.append(tuple_rc(rc, data)) @@ -35,16 +35,26 @@ class RC(): def data (self): d = [x.data if x.rc else "" for x in self.rc_list] - return (d if len(d) > 1 else d[0]) + return (d if len(d) != 1 else d[0]) def err (self): e = [x.data if not x.rc else "" for x in self.rc_list] - return (e if len(e) > 1 else e[0]) + return (e if len(e) != 1 else e[0]) def __str__ (self): return str(self.data()) if self else str(self.err()) - def annotate (self, log_func, desc = None, show_status = True): + def prn_func (self, msg, newline = True): + if newline: + print msg + else: + print msg, + + def annotate (self, log_func = None, desc = None, show_status = True): + + if not log_func: + log_func = self.prn_func + if desc: log_func(format_text('\n{:<60}'.format(desc), 'bold'), newline = False) else: diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 2ae92fb6..f086c208 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -29,7 +29,7 @@ import sys import tty, termios import trex_root_path from common.trex_streams import * -from client.trex_stateless_client import CTRexStatelessClient +from client.trex_stateless_client import CTRexStatelessClient, LoggerApi from common.text_opts import * from client_utils.general_utils import user_input, get_current_user from client_utils import parsing_opts @@ -266,36 +266,6 @@ class TRexConsole(TRexGeneralCmd): return targets - # annotation method - @staticmethod - def annotate (desc, rc = None, err_log = None, ext_err_msg = None): - print format_text('\n{:<40}'.format(desc), 'bold'), - if rc == None: - print "\n" - return - - if rc == False: - # do we have a complex log object ? - if isinstance(err_log, list): - print "" - for func in err_log: - if func: - print func - print "" - - elif isinstance(err_log, str): - print "\n" + err_log + "\n" - - print format_text("[FAILED]\n", 'red', 'bold') - if ext_err_msg: - print format_text(ext_err_msg + "\n", 'blue', 'bold') - - return False - - else: - print format_text("[SUCCESS]\n", 'green', 'bold') - return True - ####################### shell commands ####################### @verify_connected @@ -667,10 +637,19 @@ def main(): options = parser.parse_args() # Stateless client connection - stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub, options.quiet) + if options.quiet: + verbose_level = LoggerApi.VERBOSE_QUIET + elif options.verbose: + verbose_level = LoggerApi.VERBOSE_HIGH + else: + verbose_level = LoggerApi.VERBOSE_REGULAR - if not options.quiet: - print "\nlogged as {0}".format(format_text(options.user, 'bold')) + # Stateless client connection + stateless_client = CTRexStatelessClient(options.user, + options.server, + options.port, + options.pub, + verbose_level) # TUI or no acquire will give us READ ONLY mode if options.tui or not options.acquire: @@ -679,7 +658,7 @@ def main(): rc = stateless_client.connect("RW") # unable to connect - bye - if rc.bad(): + if not rc: rc.annotate() return diff --git a/scripts/stl_test_api/__init__.py b/scripts/stl_test_api/__init__.py index 6df72136..7381d88e 100644 --- a/scripts/stl_test_api/__init__.py +++ b/scripts/stl_test_api/__init__.py @@ -1,3 +1,9 @@ +# +# TRex stateless API +# provides a layer for communicating with TRex +# using Python API +# + import sys import os import time @@ -43,24 +49,29 @@ class BasicTestAPI(object): # main object def __init__ (self, server = "localhost", sync_port = 4501, async_port = 4500): self.logger = BasicTestAPI.Logger() - self.client = CTRexStatelessClient(logger = self.logger, sync_port = sync_port, async_port = async_port) + self.client = CTRexStatelessClient(logger = self.logger, + sync_port = sync_port, + async_port = async_port, + verbose_level = LoggerApi.VERBOSE_REGULAR) + + self.__invalid_stats = True + # connect to the stateless client def connect (self): rc = self.client.connect(mode = "RWF") self.__verify_rc(rc) + # disconnect from the stateless client def disconnect (self): self.client.disconnect() - def __verify_rc (self, rc): - if rc.bad(): - raise self.Failure(rc) - def inject_profile (self, filename, rate = "1", duration = None): - cmd = "-f {0} -m {1}".format(filename, rate) + self.__invalid_stats = True + + cmd = "--total -f {0} -m {1}".format(filename, rate) if duration: cmd += " -d {0}".format(duration) @@ -74,9 +85,24 @@ class BasicTestAPI(object): def get_stats (self): + if self.__invalid_stats: + # send a barrier + rc = self.client.block_on_stats() + self.__verify_rc(rc) + self.__invalid_stats = False + total_stats = trex_stats.CPortStats(None) for port in self.client.ports.values(): total_stats += port.port_stats return total_stats + + + # some internal methods + + # verify RC return value + def __verify_rc (self, rc): + if not rc: + raise self.Failure(rc) + diff --git a/scripts/stl_test_example.py b/scripts/stl_test_example.py index 689ed316..7974758d 100644 --- a/scripts/stl_test_example.py +++ b/scripts/stl_test_example.py @@ -1,24 +1,42 @@ -from stl_test_api import BasicTestAPI -x = BasicTestAPI() +# simple test that uses simple API with stateless TRex +#from stl_test_api import BasicTestAPI +api_path = os.path.dirname(os.path.abspath(__file__)) +sys.path.insert(0, os.path.join(api_path, '../automation/trex_control_plane/client/')) -try: - x.connect() - - s = x.get_stats() - - print "input packets before test: %s" % s['ipackets'] - - x.inject_profile("stl/imix_1pkt.yaml", rate = "5gbps", duration = 1) - x.wait_while_traffic_on() - - s = x.get_stats() - - print "input packets after test: %s" % s['ipackets'] - - print "Test passed :-)\n" +from trex_stateless_client import CTRexStatelessClient, LoggerApi +c = CTRexStatelessClient() +try: + c.connect() + #before_ipackets = x.get_stats().get_rel('ipackets') + c.cmd_start_line("-f stl/imix_1pkt.yaml -m 5mpps -d 1") + c.cmd_wait_on_traffic() finally: - x.disconnect() + c.disconnect() + +#x = BasicTestAPI() +# +#try: +# x.connect() +# +# before_ipackets = x.get_stats().get_rel('ipackets') +# +# print "input packets before test: %s" % before_ipackets +# +# x.inject_profile("stl/imix_1pkt.yaml", rate = "5mpps", duration = 1) +# x.wait_while_traffic_on() +# +# after_ipackets = x.get_stats().get_rel('ipackets') +# +# print "input packets after test: %s" % after_ipackets +# +# if (after_ipackets - before_ipackets) == 5000000: +# print "Test passed :-)\n" +# else: +# print "Test failed :-(\n" +# +#finally: +# x.disconnect() diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h index 3ae49da8..f8bc10d5 100644 --- a/src/internal_api/trex_platform_api.h +++ b/src/internal_api/trex_platform_api.h @@ -110,6 +110,7 @@ public: virtual void get_global_stats(TrexPlatformGlobalStats &stats) const = 0; virtual void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const = 0; virtual void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const = 0; + virtual void publish_async_data_now(uint32_t key) const = 0; virtual uint8_t get_dp_core_count() const = 0; virtual ~TrexPlatformApi() {} @@ -127,6 +128,7 @@ public: void get_global_stats(TrexPlatformGlobalStats &stats) const; void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const; void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const; + void publish_async_data_now(uint32_t key) const; uint8_t get_dp_core_count() const; }; @@ -146,6 +148,7 @@ public: speed = SPEED_INVALID; } + void publish_async_data_now(uint32_t key) const {} uint8_t get_dp_core_count() const; }; diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 7c25b2e2..a5e87172 100755 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -2609,8 +2609,12 @@ private: bool is_all_cores_finished(); public: + + void publish_async_data(); + void publish_async_barrier(uint32_t key); + void dump_stats(FILE *fd, - std::string & json,CGlobalStats::DumpFormat format); + CGlobalStats::DumpFormat format); void dump_template_info(std::string & json); bool sanity_check(); void update_stats(void); @@ -2649,6 +2653,7 @@ private: CLatencyVmPort m_latency_vm_vports[BP_MAX_PORTS]; /* vm driver */ CLatencyPktInfo m_latency_pkt; TrexPublisher m_zmq_publisher; + CGlobalStats m_stats; public: TrexStateless *m_trex_stateless; @@ -3448,11 +3453,11 @@ void CGlobalTRex::dump_template_info(std::string & json){ json+="]}" ; } -void CGlobalTRex::dump_stats(FILE *fd,std::string & json, - CGlobalStats::DumpFormat format){ - CGlobalStats stats; +void CGlobalTRex::dump_stats(FILE *fd, CGlobalStats::DumpFormat format){ + update_stats(); - get_stats(stats); + get_stats(m_stats); + if (format==CGlobalStats::dmpTABLE) { if ( m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ){ switch (m_io_modes.m_pp_mode ){ @@ -3461,11 +3466,11 @@ void CGlobalTRex::dump_stats(FILE *fd,std::string & json, break; case CTrexGlobalIoMode::ppTABLE: fprintf(fd,"\n-Per port stats table \n"); - stats.Dump(fd,CGlobalStats::dmpTABLE); + m_stats.Dump(fd,CGlobalStats::dmpTABLE); break; case CTrexGlobalIoMode::ppSTANDARD: fprintf(fd,"\n-Per port stats - standard\n"); - stats.Dump(fd,CGlobalStats::dmpSTANDARD); + m_stats.Dump(fd,CGlobalStats::dmpSTANDARD); break; }; @@ -3475,22 +3480,62 @@ void CGlobalTRex::dump_stats(FILE *fd,std::string & json, break; case CTrexGlobalIoMode::apENABLE: fprintf(fd,"\n-Global stats enabled \n"); - stats.DumpAllPorts(fd); + m_stats.DumpAllPorts(fd); break; }; } }else{ /* at exit , always need to dump it in standartd mode for scripts*/ - stats.Dump(fd,format); - stats.DumpAllPorts(fd); + m_stats.Dump(fd,format); + m_stats.DumpAllPorts(fd); } - stats.dump_json(json); + +} + + +void +CGlobalTRex::publish_async_data() { + std::string json; + + m_stats.dump_json(json); + m_zmq_publisher.publish_json(json); + + /* generator json , all cores are the same just sample the first one */ + m_fl.m_threads_info[0]->m_node_gen.dump_json(json); + m_zmq_publisher.publish_json(json); + + + if ( !get_is_stateless() ){ + dump_template_info(json); + m_zmq_publisher.publish_json(json); + } + + if ( get_is_rx_check_mode() ) { + m_mg.rx_check_dump_json(json ); + m_zmq_publisher.publish_json(json); + } + + /* backward compatible */ + m_mg.dump_json(json ); + m_zmq_publisher.publish_json(json); + + /* more info */ + m_mg.dump_json_v2(json ); + m_zmq_publisher.publish_json(json); + + /* stateless info - nothing for now */ + //m_trex_stateless->generate_publish_snapshot(json); + //m_zmq_publisher.publish_json(json); } +void +CGlobalTRex::publish_async_barrier(uint32_t key) { + m_zmq_publisher.publish_barrier(key); +} -int CGlobalTRex::run_in_master(){ +int CGlobalTRex::run_in_master() { - std::string json; + bool was_stopped=false; if ( get_is_stateless() ) { @@ -3530,7 +3575,7 @@ int CGlobalTRex::run_in_master(){ m_io_modes.DumpHelp(stdout); } - dump_stats(stdout,json,CGlobalStats::dmpTABLE); + dump_stats(stdout,CGlobalStats::dmpTABLE); if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) { fprintf (stdout," current time : %.1f sec \n",now_sec()); @@ -3542,16 +3587,6 @@ int CGlobalTRex::run_in_master(){ fprintf (stdout," test duration : %.1f sec \n",d); } - m_zmq_publisher.publish_json(json); - - /* generator json , all cores are the same just sample the first one */ - m_fl.m_threads_info[0]->m_node_gen.dump_json(json); - m_zmq_publisher.publish_json(json); - - if ( !get_is_stateless() ){ - dump_template_info(json); - m_zmq_publisher.publish_json(json); - } if ( !CGlobalInfo::m_options.is_latency_disabled() ){ m_mg.update(); @@ -3591,24 +3626,12 @@ int CGlobalTRex::run_in_master(){ } - if ( get_is_rx_check_mode() ) { - m_mg.rx_check_dump_json(json ); - m_zmq_publisher.publish_json(json); - } - - /* backward compatible */ - m_mg.dump_json(json ); - m_zmq_publisher.publish_json(json); - - /* more info */ - m_mg.dump_json_v2(json ); - m_zmq_publisher.publish_json(json); + } - /* stateless info */ - m_trex_stateless->generate_publish_snapshot(json); - m_zmq_publisher.publish_json(json); + /* publish data */ + publish_async_data(); /* check from messages from DP */ check_for_dp_messages(); @@ -3679,11 +3702,10 @@ int CGlobalTRex::run_in_core(virtual_thread_id_t virt_core_id){ int CGlobalTRex::stop_master(){ delay(1000); - std::string json; fprintf(stdout," ==================\n"); fprintf(stdout," interface sum \n"); fprintf(stdout," ==================\n"); - dump_stats(stdout,json,CGlobalStats::dmpSTANDARD); + dump_stats(stdout,CGlobalStats::dmpSTANDARD); fprintf(stdout," ==================\n"); fprintf(stdout," \n\n"); @@ -3724,7 +3746,7 @@ int CGlobalTRex::stop_master(){ m_mg.DumpRxCheckVerification(stdout,total_tx_rx_check); } - dump_stats(stdout,json,CGlobalStats::dmpSTANDARD); + dump_stats(stdout,CGlobalStats::dmpSTANDARD); dump_post_test_stats(stdout); m_fl.Delete(); @@ -4899,3 +4921,10 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id, driver_name = CTRexExtendedDriverDb::Ins()->get_driver_name(); speed = CTRexExtendedDriverDb::Ins()->get_drv()->get_driver_speed(); } + +void +TrexDpdkPlatformApi::publish_async_data_now(uint32_t key) const { + g_trex.publish_async_data(); + g_trex.publish_async_barrier(key); +} + diff --git a/src/mock/trex_platform_api_mock.cpp b/src/mock/trex_platform_api_mock.cpp index 416c4b69..7cacd96c 100644 --- a/src/mock/trex_platform_api_mock.cpp +++ b/src/mock/trex_platform_api_mock.cpp @@ -51,3 +51,4 @@ void TrexMockPlatformApi::port_id_to_cores(uint8_t port_id, std::vector> &cores_id_list) const { cores_id_list.push_back(std::make_pair(0, 0)); } + diff --git a/src/publisher/trex_publisher.cpp b/src/publisher/trex_publisher.cpp index 35653069..f56d56df 100644 --- a/src/publisher/trex_publisher.cpp +++ b/src/publisher/trex_publisher.cpp @@ -94,6 +94,21 @@ TrexPublisher::publish_event(event_type_e type, const Json::Value &data) { publish_json(s); } +void +TrexPublisher::publish_barrier(uint32_t key) { + Json::FastWriter writer; + Json::Value value; + std::string s; + + value["name"] = "trex-barrier"; + value["type"] = key; + value["data"] = Json::objectValue; + + s = writer.write(value); + publish_json(s); +} + + /** * error handling * diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h index 52978476..f086babb 100644 --- a/src/publisher/trex_publisher.h +++ b/src/publisher/trex_publisher.h @@ -53,8 +53,20 @@ public: }; + /** + * publishes an async event + * + */ virtual void publish_event(event_type_e type, const Json::Value &data = Json::nullValue); + /** + * publishes a barrier requested by the user + * + * @author imarom (17-Jan-16) + * + */ + virtual void publish_barrier(uint32_t key); + private: void show_zmq_last_error(const std::string &err); private: diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index a701f6db..66999144 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -317,3 +317,19 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value ¶ms, Json::Value &result) { return (TREX_RPC_CMD_OK); } +/** + * publish async data now (fast flush) + * + */ +trex_rpc_cmd_rc_e +TrexRpcPublishNow::_run(const Json::Value ¶ms, Json::Value &result) { + TrexStateless *main = get_stateless_obj(); + + uint32_t key = parse_uint32(params, "key", result); + + main->get_platform_api()->publish_async_data_now(key); + + result["result"] = Json::objectValue; + return (TREX_RPC_CMD_OK); + +} diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index b1750053..081398d1 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -56,6 +56,7 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdTestSub, "test_sub", 2, false); * general cmds */ TREX_RPC_CMD_DEFINE(TrexRpcCmdPing, "ping", 0, false); +TREX_RPC_CMD_DEFINE(TrexRpcPublishNow, "publish_now", 1, false); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetCmds, "get_supported_cmds", 0, false); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetVersion, "get_version", 0, false); diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index 6e5fbfc6..aee92539 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -51,6 +51,8 @@ TrexRpcServerAsync::_prepare() { */ void TrexRpcServerAsync::_rpc_thread_cb() { +/* disabled, using the main publisher */ +#if 0 std::stringstream ss; /* create a socket based on the configuration */ @@ -105,6 +107,7 @@ TrexRpcServerAsync::_rpc_thread_cb() { /* must be closed from the same thread */ zmq_close(m_socket); +#endif } void diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index 82c723b7..5218cd0a 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -34,6 +34,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { /* general */ register_command(new TrexRpcCmdPing()); + register_command(new TrexRpcPublishNow()); register_command(new TrexRpcCmdGetCmds()); register_command(new TrexRpcCmdGetVersion()); register_command(new TrexRpcCmdGetSysInfo()); diff --git a/src/sim/trex_sim_stateless.cpp b/src/sim/trex_sim_stateless.cpp index 215315e0..13f264cf 100644 --- a/src/sim/trex_sim_stateless.cpp +++ b/src/sim/trex_sim_stateless.cpp @@ -97,6 +97,10 @@ public: } } + virtual void publish_async_data_now(uint32_t key) const { + + } + private: int m_dp_core_count; }; -- cgit 1.2.3-korg