summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-01-18 06:59:36 -0500
committerimarom <imarom@cisco.com>2016-01-21 10:11:54 -0500
commit9932ff8dcf4f8b6b6f3986832f8a1a8f8461c743 (patch)
tree28c577725377131eafbd54319407066dcbd385a8
parent11d328d3e40b04540489eec83ac484d5b06254bb (diff)
async publish now
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py53
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py11
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/jsonrpc_client.py7
-rw-r--r--scripts/automation/trex_control_plane/common/trex_types.py18
-rwxr-xr-xscripts/automation/trex_control_plane/console/trex_console.py49
-rw-r--r--scripts/stl_test_api/__init__.py38
-rw-r--r--scripts/stl_test_example.py54
-rw-r--r--src/internal_api/trex_platform_api.h3
-rwxr-xr-xsrc/main_dpdk.cpp113
-rw-r--r--src/mock/trex_platform_api_mock.cpp1
-rw-r--r--src/publisher/trex_publisher.cpp15
-rw-r--r--src/publisher/trex_publisher.h12
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp16
-rw-r--r--src/rpc-server/commands/trex_rpc_cmds.h1
-rw-r--r--src/rpc-server/trex_rpc_async_server.cpp3
-rw-r--r--src/rpc-server/trex_rpc_cmds_table.cpp1
-rw-r--r--src/sim/trex_sim_stateless.cpp4
17 files changed, 280 insertions, 119 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 66e65a32..2bb0e9cd 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -16,6 +16,7 @@ import time
import datetime
import zmq
import re
+import random
from common.trex_stats import *
from common.trex_streams import *
@@ -155,6 +156,7 @@ class CTRexAsyncClient():
self.stats = CTRexAsyncStatsManager()
self.last_data_recv_ts = 0
+ self.async_barriers = []
self.connected = False
@@ -188,17 +190,16 @@ class CTRexAsyncClient():
self.connected = True
-
- # wait for data streaming from the server
- timeout = time.time() + 5
- while not self.is_alive():
- time.sleep(0.01)
- if time.time() > timeout:
- self.disconnect()
- return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr)
+ # send a barrier and wait for ack
+ rc = self.block_on_stats()
+ if not rc:
+ self.disconnect()
+ return rc
return RC_OK()
+
+
# disconnect
def disconnect (self):
@@ -284,10 +285,46 @@ class CTRexAsyncClient():
# stats
if name == "trex-global":
self.stateless_client.handle_async_stats_update(data)
+
# events
elif name == "trex-event":
self.stateless_client.handle_async_event(type, data)
+
+ # barriers
+ elif name == "trex-barrier":
+ self.handle_async_barrier(type, data)
else:
pass
+ # async barrier handling routine
+ def handle_async_barrier (self, type, data):
+
+ for b in self.async_barriers:
+ if b['key'] == type:
+ b['ack'] = True
+
+
+ # force update a new snapshot from the server
+ def block_on_stats(self, timeout = 5):
+
+ # set a random key
+ key = random.getrandbits(32)
+ barrier = {'key': key, 'ack' : False}
+
+ # add to the queue
+ self.async_barriers.append(barrier)
+
+ rc = self.stateless_client.transmit("publish_now", params = {'key' : key})
+ if not rc:
+ return rc
+
+ expr = time.time() + timeout
+ while not barrier['ack']:
+ time.sleep(0.001)
+ if time.time() > expr:
+ return RC_ERR("*** [subscriber] - timeout - no data flow from server at : " + self.tr)
+
+ return RC_OK()
+
+
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index b7700c7c..105c4d01 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -55,12 +55,16 @@ class LoggerApi(object):
return (self.level >= level)
+ # simple log message with verbose
def log (self, msg, level = VERBOSE_REGULAR, newline = True):
if not self.check_verbose(level):
return
self.write(msg, newline)
+ # annotates an action with a RC - writes to log the result
+ def annotate (self, rc, desc = None, show_status = True):
+ rc.annotate(self.log, desc, show_status)
# default logger - to stdout
class DefaultLogger(LoggerApi):
@@ -82,7 +86,7 @@ class CTRexStatelessClient(object):
server = "localhost",
sync_port = 4501,
async_port = 4500,
- quiet = False,
+ verbose_level = LoggerApi.VERBOSE_REGULAR,
virtual = False,
logger = None):
@@ -90,13 +94,14 @@ class CTRexStatelessClient(object):
self.user = username
+ # logger
if not logger:
self.logger = DefaultLogger()
else:
self.logger = logger
- if quiet:
- self.logger.set_verbose(self.logger.VERBOSE_QUIET)
+ # initial verbose
+ self.logger.set_verbose(verbose_level)
self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.logger)
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
index a5e01340..e08f5d69 100755
--- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
+++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
@@ -124,7 +124,7 @@ class JsonRpcClient(object):
break
except zmq.Again:
tries += 1
- if tries > 10:
+ if tries > 5:
self.disconnect()
return RC_ERR("*** [RPC] - Failed to send message to server")
@@ -136,9 +136,9 @@ class JsonRpcClient(object):
break
except zmq.Again:
tries += 1
- if tries > 10:
+ if tries > 5:
self.disconnect()
- return RC_ERR("*** [RPC] - Failed to get server response")
+ return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport))
self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
@@ -173,6 +173,7 @@ class JsonRpcClient(object):
else:
return RC_ERR(response_json["error"]["message"])
+
# if no error there should be a result
if ("result" not in response_json):
return RC_ERR("Malformed Response ({0})".format(str(response_json)))
diff --git a/scripts/automation/trex_control_plane/common/trex_types.py b/scripts/automation/trex_control_plane/common/trex_types.py
index 21deffd2..5c29f59b 100644
--- a/scripts/automation/trex_control_plane/common/trex_types.py
+++ b/scripts/automation/trex_control_plane/common/trex_types.py
@@ -17,7 +17,7 @@ class RC():
def __init__ (self, rc = None, data = None):
self.rc_list = []
- if (rc != None) and (data != None):
+ if (rc != None):
tuple_rc = namedtuple('RC', ['rc', 'data'])
self.rc_list.append(tuple_rc(rc, data))
@@ -35,16 +35,26 @@ class RC():
def data (self):
d = [x.data if x.rc else "" for x in self.rc_list]
- return (d if len(d) > 1 else d[0])
+ return (d if len(d) != 1 else d[0])
def err (self):
e = [x.data if not x.rc else "" for x in self.rc_list]
- return (e if len(e) > 1 else e[0])
+ return (e if len(e) != 1 else e[0])
def __str__ (self):
return str(self.data()) if self else str(self.err())
- def annotate (self, log_func, desc = None, show_status = True):
+ def prn_func (self, msg, newline = True):
+ if newline:
+ print msg
+ else:
+ print msg,
+
+ def annotate (self, log_func = None, desc = None, show_status = True):
+
+ if not log_func:
+ log_func = self.prn_func
+
if desc:
log_func(format_text('\n{:<60}'.format(desc), 'bold'), newline = False)
else:
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py
index 2ae92fb6..f086c208 100755
--- a/scripts/automation/trex_control_plane/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/console/trex_console.py
@@ -29,7 +29,7 @@ import sys
import tty, termios
import trex_root_path
from common.trex_streams import *
-from client.trex_stateless_client import CTRexStatelessClient
+from client.trex_stateless_client import CTRexStatelessClient, LoggerApi
from common.text_opts import *
from client_utils.general_utils import user_input, get_current_user
from client_utils import parsing_opts
@@ -266,36 +266,6 @@ class TRexConsole(TRexGeneralCmd):
return targets
- # annotation method
- @staticmethod
- def annotate (desc, rc = None, err_log = None, ext_err_msg = None):
- print format_text('\n{:<40}'.format(desc), 'bold'),
- if rc == None:
- print "\n"
- return
-
- if rc == False:
- # do we have a complex log object ?
- if isinstance(err_log, list):
- print ""
- for func in err_log:
- if func:
- print func
- print ""
-
- elif isinstance(err_log, str):
- print "\n" + err_log + "\n"
-
- print format_text("[FAILED]\n", 'red', 'bold')
- if ext_err_msg:
- print format_text(ext_err_msg + "\n", 'blue', 'bold')
-
- return False
-
- else:
- print format_text("[SUCCESS]\n", 'green', 'bold')
- return True
-
####################### shell commands #######################
@verify_connected
@@ -667,10 +637,19 @@ def main():
options = parser.parse_args()
# Stateless client connection
- stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub, options.quiet)
+ if options.quiet:
+ verbose_level = LoggerApi.VERBOSE_QUIET
+ elif options.verbose:
+ verbose_level = LoggerApi.VERBOSE_HIGH
+ else:
+ verbose_level = LoggerApi.VERBOSE_REGULAR
- if not options.quiet:
- print "\nlogged as {0}".format(format_text(options.user, 'bold'))
+ # Stateless client connection
+ stateless_client = CTRexStatelessClient(options.user,
+ options.server,
+ options.port,
+ options.pub,
+ verbose_level)
# TUI or no acquire will give us READ ONLY mode
if options.tui or not options.acquire:
@@ -679,7 +658,7 @@ def main():
rc = stateless_client.connect("RW")
# unable to connect - bye
- if rc.bad():
+ if not rc:
rc.annotate()
return
diff --git a/scripts/stl_test_api/__init__.py b/scripts/stl_test_api/__init__.py
index 6df72136..7381d88e 100644
--- a/scripts/stl_test_api/__init__.py
+++ b/scripts/stl_test_api/__init__.py
@@ -1,3 +1,9 @@
+#
+# TRex stateless API
+# provides a layer for communicating with TRex
+# using Python API
+#
+
import sys
import os
import time
@@ -43,24 +49,29 @@ class BasicTestAPI(object):
# main object
def __init__ (self, server = "localhost", sync_port = 4501, async_port = 4500):
self.logger = BasicTestAPI.Logger()
- self.client = CTRexStatelessClient(logger = self.logger, sync_port = sync_port, async_port = async_port)
+ self.client = CTRexStatelessClient(logger = self.logger,
+ sync_port = sync_port,
+ async_port = async_port,
+ verbose_level = LoggerApi.VERBOSE_REGULAR)
+
+ self.__invalid_stats = True
+ # connect to the stateless client
def connect (self):
rc = self.client.connect(mode = "RWF")
self.__verify_rc(rc)
+ # disconnect from the stateless client
def disconnect (self):
self.client.disconnect()
- def __verify_rc (self, rc):
- if rc.bad():
- raise self.Failure(rc)
-
def inject_profile (self, filename, rate = "1", duration = None):
- cmd = "-f {0} -m {1}".format(filename, rate)
+ self.__invalid_stats = True
+
+ cmd = "--total -f {0} -m {1}".format(filename, rate)
if duration:
cmd += " -d {0}".format(duration)
@@ -74,9 +85,24 @@ class BasicTestAPI(object):
def get_stats (self):
+ if self.__invalid_stats:
+ # send a barrier
+ rc = self.client.block_on_stats()
+ self.__verify_rc(rc)
+ self.__invalid_stats = False
+
total_stats = trex_stats.CPortStats(None)
for port in self.client.ports.values():
total_stats += port.port_stats
return total_stats
+
+
+ # some internal methods
+
+ # verify RC return value
+ def __verify_rc (self, rc):
+ if not rc:
+ raise self.Failure(rc)
+
diff --git a/scripts/stl_test_example.py b/scripts/stl_test_example.py
index 689ed316..7974758d 100644
--- a/scripts/stl_test_example.py
+++ b/scripts/stl_test_example.py
@@ -1,24 +1,42 @@
-from stl_test_api import BasicTestAPI
-x = BasicTestAPI()
+# simple test that uses simple API with stateless TRex
+#from stl_test_api import BasicTestAPI
+api_path = os.path.dirname(os.path.abspath(__file__))
+sys.path.insert(0, os.path.join(api_path, '../automation/trex_control_plane/client/'))
-try:
- x.connect()
-
- s = x.get_stats()
-
- print "input packets before test: %s" % s['ipackets']
-
- x.inject_profile("stl/imix_1pkt.yaml", rate = "5gbps", duration = 1)
- x.wait_while_traffic_on()
-
- s = x.get_stats()
-
- print "input packets after test: %s" % s['ipackets']
-
- print "Test passed :-)\n"
+from trex_stateless_client import CTRexStatelessClient, LoggerApi
+c = CTRexStatelessClient()
+try:
+ c.connect()
+ #before_ipackets = x.get_stats().get_rel('ipackets')
+ c.cmd_start_line("-f stl/imix_1pkt.yaml -m 5mpps -d 1")
+ c.cmd_wait_on_traffic()
finally:
- x.disconnect()
+ c.disconnect()
+
+#x = BasicTestAPI()
+#
+#try:
+# x.connect()
+#
+# before_ipackets = x.get_stats().get_rel('ipackets')
+#
+# print "input packets before test: %s" % before_ipackets
+#
+# x.inject_profile("stl/imix_1pkt.yaml", rate = "5mpps", duration = 1)
+# x.wait_while_traffic_on()
+#
+# after_ipackets = x.get_stats().get_rel('ipackets')
+#
+# print "input packets after test: %s" % after_ipackets
+#
+# if (after_ipackets - before_ipackets) == 5000000:
+# print "Test passed :-)\n"
+# else:
+# print "Test failed :-(\n"
+#
+#finally:
+# x.disconnect()
diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h
index 3ae49da8..f8bc10d5 100644
--- a/src/internal_api/trex_platform_api.h
+++ b/src/internal_api/trex_platform_api.h
@@ -110,6 +110,7 @@ public:
virtual void get_global_stats(TrexPlatformGlobalStats &stats) const = 0;
virtual void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const = 0;
virtual void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const = 0;
+ virtual void publish_async_data_now(uint32_t key) const = 0;
virtual uint8_t get_dp_core_count() const = 0;
virtual ~TrexPlatformApi() {}
@@ -127,6 +128,7 @@ public:
void get_global_stats(TrexPlatformGlobalStats &stats) const;
void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const;
void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const;
+ void publish_async_data_now(uint32_t key) const;
uint8_t get_dp_core_count() const;
};
@@ -146,6 +148,7 @@ public:
speed = SPEED_INVALID;
}
+ void publish_async_data_now(uint32_t key) const {}
uint8_t get_dp_core_count() const;
};
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 7c25b2e2..a5e87172 100755
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -2609,8 +2609,12 @@ private:
bool is_all_cores_finished();
public:
+
+ void publish_async_data();
+ void publish_async_barrier(uint32_t key);
+
void dump_stats(FILE *fd,
- std::string & json,CGlobalStats::DumpFormat format);
+ CGlobalStats::DumpFormat format);
void dump_template_info(std::string & json);
bool sanity_check();
void update_stats(void);
@@ -2649,6 +2653,7 @@ private:
CLatencyVmPort m_latency_vm_vports[BP_MAX_PORTS]; /* vm driver */
CLatencyPktInfo m_latency_pkt;
TrexPublisher m_zmq_publisher;
+ CGlobalStats m_stats;
public:
TrexStateless *m_trex_stateless;
@@ -3448,11 +3453,11 @@ void CGlobalTRex::dump_template_info(std::string & json){
json+="]}" ;
}
-void CGlobalTRex::dump_stats(FILE *fd,std::string & json,
- CGlobalStats::DumpFormat format){
- CGlobalStats stats;
+void CGlobalTRex::dump_stats(FILE *fd, CGlobalStats::DumpFormat format){
+
update_stats();
- get_stats(stats);
+ get_stats(m_stats);
+
if (format==CGlobalStats::dmpTABLE) {
if ( m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ){
switch (m_io_modes.m_pp_mode ){
@@ -3461,11 +3466,11 @@ void CGlobalTRex::dump_stats(FILE *fd,std::string & json,
break;
case CTrexGlobalIoMode::ppTABLE:
fprintf(fd,"\n-Per port stats table \n");
- stats.Dump(fd,CGlobalStats::dmpTABLE);
+ m_stats.Dump(fd,CGlobalStats::dmpTABLE);
break;
case CTrexGlobalIoMode::ppSTANDARD:
fprintf(fd,"\n-Per port stats - standard\n");
- stats.Dump(fd,CGlobalStats::dmpSTANDARD);
+ m_stats.Dump(fd,CGlobalStats::dmpSTANDARD);
break;
};
@@ -3475,22 +3480,62 @@ void CGlobalTRex::dump_stats(FILE *fd,std::string & json,
break;
case CTrexGlobalIoMode::apENABLE:
fprintf(fd,"\n-Global stats enabled \n");
- stats.DumpAllPorts(fd);
+ m_stats.DumpAllPorts(fd);
break;
};
}
}else{
/* at exit , always need to dump it in standartd mode for scripts*/
- stats.Dump(fd,format);
- stats.DumpAllPorts(fd);
+ m_stats.Dump(fd,format);
+ m_stats.DumpAllPorts(fd);
}
- stats.dump_json(json);
+
+}
+
+
+void
+CGlobalTRex::publish_async_data() {
+ std::string json;
+
+ m_stats.dump_json(json);
+ m_zmq_publisher.publish_json(json);
+
+ /* generator json , all cores are the same just sample the first one */
+ m_fl.m_threads_info[0]->m_node_gen.dump_json(json);
+ m_zmq_publisher.publish_json(json);
+
+
+ if ( !get_is_stateless() ){
+ dump_template_info(json);
+ m_zmq_publisher.publish_json(json);
+ }
+
+ if ( get_is_rx_check_mode() ) {
+ m_mg.rx_check_dump_json(json );
+ m_zmq_publisher.publish_json(json);
+ }
+
+ /* backward compatible */
+ m_mg.dump_json(json );
+ m_zmq_publisher.publish_json(json);
+
+ /* more info */
+ m_mg.dump_json_v2(json );
+ m_zmq_publisher.publish_json(json);
+
+ /* stateless info - nothing for now */
+ //m_trex_stateless->generate_publish_snapshot(json);
+ //m_zmq_publisher.publish_json(json);
}
+void
+CGlobalTRex::publish_async_barrier(uint32_t key) {
+ m_zmq_publisher.publish_barrier(key);
+}
-int CGlobalTRex::run_in_master(){
+int CGlobalTRex::run_in_master() {
- std::string json;
+
bool was_stopped=false;
if ( get_is_stateless() ) {
@@ -3530,7 +3575,7 @@ int CGlobalTRex::run_in_master(){
m_io_modes.DumpHelp(stdout);
}
- dump_stats(stdout,json,CGlobalStats::dmpTABLE);
+ dump_stats(stdout,CGlobalStats::dmpTABLE);
if (m_io_modes.m_g_mode == CTrexGlobalIoMode::gNORMAL ) {
fprintf (stdout," current time : %.1f sec \n",now_sec());
@@ -3542,16 +3587,6 @@ int CGlobalTRex::run_in_master(){
fprintf (stdout," test duration : %.1f sec \n",d);
}
- m_zmq_publisher.publish_json(json);
-
- /* generator json , all cores are the same just sample the first one */
- m_fl.m_threads_info[0]->m_node_gen.dump_json(json);
- m_zmq_publisher.publish_json(json);
-
- if ( !get_is_stateless() ){
- dump_template_info(json);
- m_zmq_publisher.publish_json(json);
- }
if ( !CGlobalInfo::m_options.is_latency_disabled() ){
m_mg.update();
@@ -3591,24 +3626,12 @@ int CGlobalTRex::run_in_master(){
}
- if ( get_is_rx_check_mode() ) {
- m_mg.rx_check_dump_json(json );
- m_zmq_publisher.publish_json(json);
- }
-
- /* backward compatible */
- m_mg.dump_json(json );
- m_zmq_publisher.publish_json(json);
-
- /* more info */
- m_mg.dump_json_v2(json );
- m_zmq_publisher.publish_json(json);
+
}
- /* stateless info */
- m_trex_stateless->generate_publish_snapshot(json);
- m_zmq_publisher.publish_json(json);
+ /* publish data */
+ publish_async_data();
/* check from messages from DP */
check_for_dp_messages();
@@ -3679,11 +3702,10 @@ int CGlobalTRex::run_in_core(virtual_thread_id_t virt_core_id){
int CGlobalTRex::stop_master(){
delay(1000);
- std::string json;
fprintf(stdout," ==================\n");
fprintf(stdout," interface sum \n");
fprintf(stdout," ==================\n");
- dump_stats(stdout,json,CGlobalStats::dmpSTANDARD);
+ dump_stats(stdout,CGlobalStats::dmpSTANDARD);
fprintf(stdout," ==================\n");
fprintf(stdout," \n\n");
@@ -3724,7 +3746,7 @@ int CGlobalTRex::stop_master(){
m_mg.DumpRxCheckVerification(stdout,total_tx_rx_check);
}
- dump_stats(stdout,json,CGlobalStats::dmpSTANDARD);
+ dump_stats(stdout,CGlobalStats::dmpSTANDARD);
dump_post_test_stats(stdout);
m_fl.Delete();
@@ -4899,3 +4921,10 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id,
driver_name = CTRexExtendedDriverDb::Ins()->get_driver_name();
speed = CTRexExtendedDriverDb::Ins()->get_drv()->get_driver_speed();
}
+
+void
+TrexDpdkPlatformApi::publish_async_data_now(uint32_t key) const {
+ g_trex.publish_async_data();
+ g_trex.publish_async_barrier(key);
+}
+
diff --git a/src/mock/trex_platform_api_mock.cpp b/src/mock/trex_platform_api_mock.cpp
index 416c4b69..7cacd96c 100644
--- a/src/mock/trex_platform_api_mock.cpp
+++ b/src/mock/trex_platform_api_mock.cpp
@@ -51,3 +51,4 @@ void
TrexMockPlatformApi::port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const {
cores_id_list.push_back(std::make_pair(0, 0));
}
+
diff --git a/src/publisher/trex_publisher.cpp b/src/publisher/trex_publisher.cpp
index 35653069..f56d56df 100644
--- a/src/publisher/trex_publisher.cpp
+++ b/src/publisher/trex_publisher.cpp
@@ -94,6 +94,21 @@ TrexPublisher::publish_event(event_type_e type, const Json::Value &data) {
publish_json(s);
}
+void
+TrexPublisher::publish_barrier(uint32_t key) {
+ Json::FastWriter writer;
+ Json::Value value;
+ std::string s;
+
+ value["name"] = "trex-barrier";
+ value["type"] = key;
+ value["data"] = Json::objectValue;
+
+ s = writer.write(value);
+ publish_json(s);
+}
+
+
/**
* error handling
*
diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h
index 52978476..f086babb 100644
--- a/src/publisher/trex_publisher.h
+++ b/src/publisher/trex_publisher.h
@@ -53,8 +53,20 @@ public:
};
+ /**
+ * publishes an async event
+ *
+ */
virtual void publish_event(event_type_e type, const Json::Value &data = Json::nullValue);
+ /**
+ * publishes a barrier requested by the user
+ *
+ * @author imarom (17-Jan-16)
+ *
+ */
+ virtual void publish_barrier(uint32_t key);
+
private:
void show_zmq_last_error(const std::string &err);
private:
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index a701f6db..66999144 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -317,3 +317,19 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value &params, Json::Value &result) {
return (TREX_RPC_CMD_OK);
}
+/**
+ * publish async data now (fast flush)
+ *
+ */
+trex_rpc_cmd_rc_e
+TrexRpcPublishNow::_run(const Json::Value &params, Json::Value &result) {
+ TrexStateless *main = get_stateless_obj();
+
+ uint32_t key = parse_uint32(params, "key", result);
+
+ main->get_platform_api()->publish_async_data_now(key);
+
+ result["result"] = Json::objectValue;
+ return (TREX_RPC_CMD_OK);
+
+}
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index b1750053..081398d1 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -56,6 +56,7 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdTestSub, "test_sub", 2, false);
* general cmds
*/
TREX_RPC_CMD_DEFINE(TrexRpcCmdPing, "ping", 0, false);
+TREX_RPC_CMD_DEFINE(TrexRpcPublishNow, "publish_now", 1, false);
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetCmds, "get_supported_cmds", 0, false);
TREX_RPC_CMD_DEFINE(TrexRpcCmdGetVersion, "get_version", 0, false);
diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp
index 6e5fbfc6..aee92539 100644
--- a/src/rpc-server/trex_rpc_async_server.cpp
+++ b/src/rpc-server/trex_rpc_async_server.cpp
@@ -51,6 +51,8 @@ TrexRpcServerAsync::_prepare() {
*/
void
TrexRpcServerAsync::_rpc_thread_cb() {
+/* disabled, using the main publisher */
+#if 0
std::stringstream ss;
/* create a socket based on the configuration */
@@ -105,6 +107,7 @@ TrexRpcServerAsync::_rpc_thread_cb() {
/* must be closed from the same thread */
zmq_close(m_socket);
+#endif
}
void
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index 82c723b7..5218cd0a 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -34,6 +34,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
/* general */
register_command(new TrexRpcCmdPing());
+ register_command(new TrexRpcPublishNow());
register_command(new TrexRpcCmdGetCmds());
register_command(new TrexRpcCmdGetVersion());
register_command(new TrexRpcCmdGetSysInfo());
diff --git a/src/sim/trex_sim_stateless.cpp b/src/sim/trex_sim_stateless.cpp
index 215315e0..13f264cf 100644
--- a/src/sim/trex_sim_stateless.cpp
+++ b/src/sim/trex_sim_stateless.cpp
@@ -97,6 +97,10 @@ public:
}
}
+ virtual void publish_async_data_now(uint32_t key) const {
+
+ }
+
private:
int m_dp_core_count;
};